Skip to content

Commit

Permalink
[GLUTEN-8187][VL] Support velox cache metrics (#8188)
Browse files Browse the repository at this point in the history
Velox backend supports cache functionality controlled by the config spark.gluten.sql.columnar.backend.velox.cacheEnabled, When the cache is enabled, we want to measure its effectiveness through a series of metrics.
  • Loading branch information
yikf authored Dec 13, 2024
1 parent 25e212f commit dd72810
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public class Metrics implements IMetrics {
public long[] processedStrides;
public long[] remainingFilterTime;
public long[] ioWaitTime;
public long[] storageReadBytes;
public long[] localReadBytes;
public long[] ramReadBytes;
public long[] preloadSplits;

public long[] physicalWrittenBytes;
Expand Down Expand Up @@ -88,6 +91,9 @@ public Metrics(
long[] processedStrides,
long[] remainingFilterTime,
long[] ioWaitTime,
long[] storageReadBytes,
long[] localReadBytes,
long[] ramReadBytes,
long[] preloadSplits,
long[] physicalWrittenBytes,
long[] writeIOTime,
Expand Down Expand Up @@ -122,6 +128,9 @@ public Metrics(
this.processedStrides = processedStrides;
this.remainingFilterTime = remainingFilterTime;
this.ioWaitTime = ioWaitTime;
this.storageReadBytes = storageReadBytes;
this.localReadBytes = localReadBytes;
this.ramReadBytes = ramReadBytes;
this.preloadSplits = preloadSplits;
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
Expand Down Expand Up @@ -163,6 +172,9 @@ public OperatorMetrics getOperatorMetrics(int index) {
processedStrides[index],
remainingFilterTime[index],
ioWaitTime[index],
storageReadBytes[index],
localReadBytes[index],
ramReadBytes[index],
preloadSplits[index],
physicalWrittenBytes[index],
writeIOTime[index],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class OperatorMetrics implements IOperatorMetrics {
public long processedStrides;
public long remainingFilterTime;
public long ioWaitTime;
public long storageReadBytes;
public long localReadBytes;
public long ramReadBytes;
public long preloadSplits;

public long physicalWrittenBytes;
Expand Down Expand Up @@ -83,6 +86,9 @@ public OperatorMetrics(
long processedStrides,
long remainingFilterTime,
long ioWaitTime,
long storageReadBytes,
long localReadBytes,
long ramReadBytes,
long preloadSplits,
long physicalWrittenBytes,
long writeIOTime,
Expand Down Expand Up @@ -116,6 +122,9 @@ public OperatorMetrics(
this.processedStrides = processedStrides;
this.remainingFilterTime = remainingFilterTime;
this.ioWaitTime = ioWaitTime;
this.storageReadBytes = storageReadBytes;
this.localReadBytes = localReadBytes;
this.ramReadBytes = ramReadBytes;
this.preloadSplits = preloadSplits;
this.physicalWrittenBytes = physicalWrittenBytes;
this.writeIOTime = writeIOTime;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"remaining filter time"),
"ioWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "io wait time")
"ioWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "io wait time"),
"storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
"localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
)

override def genBatchScanTransformerMetricsUpdater(
Expand Down Expand Up @@ -138,7 +141,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"remaining filter time"),
"ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time")
"ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"),
"storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
"localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
)

override def genHiveTableScanTransformerMetricsUpdater(
Expand Down Expand Up @@ -176,7 +182,10 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"remaining filter time"),
"ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time")
"ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"),
"storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
"localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
)

override def genFileSourceScanTransformerMetricsUpdater(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metri
metrics("processedStrides") += operatorMetrics.processedStrides
metrics("remainingFilterTime") += operatorMetrics.remainingFilterTime
metrics("ioWaitTime") += operatorMetrics.ioWaitTime
metrics("storageReadBytes") += operatorMetrics.storageReadBytes
metrics("localReadBytes") += operatorMetrics.localReadBytes
metrics("ramReadBytes") += operatorMetrics.ramReadBytes
metrics("preloadSplits") += operatorMetrics.preloadSplits
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
val processedStrides: SQLMetric = metrics("processedStrides")
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
val ioWaitTime: SQLMetric = metrics("ioWaitTime")
val storageReadBytes: SQLMetric = metrics("storageReadBytes")
val localReadBytes: SQLMetric = metrics("localReadBytes")
val ramReadBytes: SQLMetric = metrics("ramReadBytes")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
Expand Down Expand Up @@ -73,6 +76,9 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric
processedStrides += operatorMetrics.processedStrides
remainingFilterTime += operatorMetrics.remainingFilterTime
ioWaitTime += operatorMetrics.ioWaitTime
storageReadBytes += operatorMetrics.storageReadBytes
localReadBytes += operatorMetrics.localReadBytes
ramReadBytes += operatorMetrics.ramReadBytes
preloadSplits += operatorMetrics.preloadSplits
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]
val processedStrides: SQLMetric = metrics("processedStrides")
val remainingFilterTime: SQLMetric = metrics("remainingFilterTime")
val ioWaitTime: SQLMetric = metrics("ioWaitTime")
val storageReadBytes: SQLMetric = metrics("storageReadBytes")
val localReadBytes: SQLMetric = metrics("localReadBytes")
val ramReadBytes: SQLMetric = metrics("ramReadBytes")

override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = {
inputMetrics.bridgeIncBytesRead(rawInputBytes.value)
Expand Down Expand Up @@ -68,6 +71,9 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]
processedStrides += operatorMetrics.processedStrides
remainingFilterTime += operatorMetrics.remainingFilterTime
ioWaitTime += operatorMetrics.ioWaitTime
storageReadBytes += operatorMetrics.storageReadBytes
localReadBytes += operatorMetrics.localReadBytes
ramReadBytes += operatorMetrics.ramReadBytes
preloadSplits += operatorMetrics.preloadSplits
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ object MetricsUtil extends Logging {
var processedStrides: Long = 0
var remainingFilterTime: Long = 0
var ioWaitTime: Long = 0
var storageReadBytes: Long = 0
var localReadBytes: Long = 0
var ramReadBytes: Long = 0
var preloadSplits: Long = 0
var numWrittenFiles: Long = 0

Expand Down Expand Up @@ -151,6 +154,9 @@ object MetricsUtil extends Logging {
processedStrides += metrics.processedStrides
remainingFilterTime += metrics.remainingFilterTime
ioWaitTime += metrics.ioWaitTime
storageReadBytes += metrics.storageReadBytes
localReadBytes += metrics.localReadBytes
ramReadBytes += metrics.ramReadBytes
preloadSplits += metrics.preloadSplits
numWrittenFiles += metrics.numWrittenFiles
}
Expand Down Expand Up @@ -185,6 +191,9 @@ object MetricsUtil extends Logging {
processedStrides,
remainingFilterTime,
ioWaitTime,
storageReadBytes,
localReadBytes,
ramReadBytes,
preloadSplits,
physicalWrittenBytes,
writeIOTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,4 +263,16 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}
}

test("Velox cache metrics") {
val df = spark.sql(s"SELECT * FROM metrics_t1")
val scans = collect(df.queryExecution.executedPlan) {
case scan: FileSourceScanExecTransformer => scan
}
df.collect()
assert(scans.length === 1)
val metrics = scans.head.metrics
assert(metrics("storageReadBytes").value > 0)
assert(metrics("ramReadBytes").value == 0)
}
}
8 changes: 7 additions & 1 deletion cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;");

metricsBuilderConstructor = getMethodIdOrError(
env, metricsBuilderClass, "<init>", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");
env,
metricsBuilderClass,
"<init>",
"([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V");

nativeColumnarToRowInfoClass =
createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;");
Expand Down Expand Up @@ -574,6 +577,9 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_metrics_IteratorMetricsJniWrapp
longArray[Metrics::kProcessedStrides],
longArray[Metrics::kRemainingFilterTime],
longArray[Metrics::kIoWaitTime],
longArray[Metrics::kStorageReadBytes],
longArray[Metrics::kLocalReadBytes],
longArray[Metrics::kRamReadBytes],
longArray[Metrics::kPreloadSplits],
longArray[Metrics::kPhysicalWrittenBytes],
longArray[Metrics::kWriteIOTime],
Expand Down
3 changes: 3 additions & 0 deletions cpp/core/utils/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ struct Metrics {
kProcessedStrides,
kRemainingFilterTime,
kIoWaitTime,
kStorageReadBytes,
kLocalReadBytes,
kRamReadBytes,
kPreloadSplits,

// Write metrics.
Expand Down
7 changes: 7 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ const std::string kSkippedStrides = "skippedStrides";
const std::string kProcessedStrides = "processedStrides";
const std::string kRemainingFilterTime = "totalRemainingFilterTime";
const std::string kIoWaitTime = "ioWaitWallNanos";
const std::string kStorageReadBytes = "storageReadBytes";
const std::string kLocalReadBytes = "localReadBytes";
const std::string kRamReadBytes = "ramReadBytes";
const std::string kPreloadSplits = "readyPreloadedSplits";
const std::string kNumWrittenFiles = "numWrittenFiles";
const std::string kWriteIOTime = "writeIOTime";
Expand Down Expand Up @@ -422,6 +425,10 @@ void WholeStageResultIterator::collectMetrics() {
metrics_->get(Metrics::kRemainingFilterTime)[metricIndex] =
runtimeMetric("sum", second->customStats, kRemainingFilterTime);
metrics_->get(Metrics::kIoWaitTime)[metricIndex] = runtimeMetric("sum", second->customStats, kIoWaitTime);
metrics_->get(Metrics::kStorageReadBytes)[metricIndex] =
runtimeMetric("sum", second->customStats, kStorageReadBytes);
metrics_->get(Metrics::kLocalReadBytes)[metricIndex] = runtimeMetric("sum", second->customStats, kLocalReadBytes);
metrics_->get(Metrics::kRamReadBytes)[metricIndex] = runtimeMetric("sum", second->customStats, kRamReadBytes);
metrics_->get(Metrics::kPreloadSplits)[metricIndex] =
runtimeMetric("sum", entry.second->customStats, kPreloadSplits);
metrics_->get(Metrics::kNumWrittenFiles)[metricIndex] =
Expand Down

0 comments on commit dd72810

Please sign in to comment.