diff --git a/arrow-data-source/common/src/main/scala/com/intel/oap/spark/sql/FakeRow.scala b/arrow-data-source/common/src/main/scala/com/intel/oap/spark/sql/FakeRow.scala new file mode 100644 index 000000000..fe09fde28 --- /dev/null +++ b/arrow-data-source/common/src/main/scala/com/intel/oap/spark/sql/FakeRow.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.spark.sql + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.types.{DataType, Decimal} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} + +class FakeRow(val batch: ColumnarBatch) extends InternalRow { + override def numFields: Int = throw new UnsupportedOperationException() + override def setNullAt(i: Int): Unit = throw new UnsupportedOperationException() + override def update(i: Int, value: Any): Unit = throw new UnsupportedOperationException() + override def copy(): InternalRow = throw new UnsupportedOperationException() + override def isNullAt(ordinal: Int): Boolean = throw new UnsupportedOperationException() + override def getBoolean(ordinal: Int): Boolean = throw new UnsupportedOperationException() + override def getByte(ordinal: Int): Byte = throw new UnsupportedOperationException() + override def getShort(ordinal: Int): Short = throw new UnsupportedOperationException() + override def getInt(ordinal: Int): Int = throw new UnsupportedOperationException() + override def getLong(ordinal: Int): Long = throw new UnsupportedOperationException() + override def getFloat(ordinal: Int): Float = throw new UnsupportedOperationException() + override def getDouble(ordinal: Int): Double = throw new UnsupportedOperationException() + override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = + throw new UnsupportedOperationException() + override def getUTF8String(ordinal: Int): UTF8String = + throw new UnsupportedOperationException() + override def getBinary(ordinal: Int): Array[Byte] = throw new UnsupportedOperationException() + override def getInterval(ordinal: Int): CalendarInterval = + throw new UnsupportedOperationException() + override def getStruct(ordinal: Int, numFields: Int): InternalRow = + throw new UnsupportedOperationException() + override def getArray(ordinal: Int): ArrayData = throw new UnsupportedOperationException() + override def getMap(ordinal: Int): MapData = throw new UnsupportedOperationException() + override def get(ordinal: Int, dataType: DataType): AnyRef = + throw new UnsupportedOperationException() +} diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowWriteExtension.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowWriteExtension.scala index 1dd4c968d..2e7b14bed 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowWriteExtension.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/ArrowWriteExtension.scala @@ -31,8 +31,6 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.logical.OrderPreservingUnaryNode import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.ArrayData -import org.apache.spark.sql.catalyst.util.MapData import org.apache.spark.sql.execution.ColumnarRule import org.apache.spark.sql.execution.ColumnarToRowExec import org.apache.spark.sql.execution.ColumnarToRowTransition @@ -41,11 +39,6 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.types.Decimal -import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.unsafe.types.CalendarInterval -import org.apache.spark.unsafe.types.UTF8String class ArrowWriteExtension extends (SparkSessionExtensions => Unit) { def apply(e: SparkSessionExtensions): Unit = { @@ -107,34 +100,6 @@ object ArrowWriteExtension { } } - class FakeRow(val batch: ColumnarBatch) extends InternalRow { - override def numFields: Int = throw new UnsupportedOperationException() - override def setNullAt(i: Int): Unit = throw new UnsupportedOperationException() - override def update(i: Int, value: Any): Unit = throw new UnsupportedOperationException() - override def copy(): InternalRow = throw new UnsupportedOperationException() - override def isNullAt(ordinal: Int): Boolean = throw new UnsupportedOperationException() - override def getBoolean(ordinal: Int): Boolean = throw new UnsupportedOperationException() - override def getByte(ordinal: Int): Byte = throw new UnsupportedOperationException() - override def getShort(ordinal: Int): Short = throw new UnsupportedOperationException() - override def getInt(ordinal: Int): Int = throw new UnsupportedOperationException() - override def getLong(ordinal: Int): Long = throw new UnsupportedOperationException() - override def getFloat(ordinal: Int): Float = throw new UnsupportedOperationException() - override def getDouble(ordinal: Int): Double = throw new UnsupportedOperationException() - override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = - throw new UnsupportedOperationException() - override def getUTF8String(ordinal: Int): UTF8String = - throw new UnsupportedOperationException() - override def getBinary(ordinal: Int): Array[Byte] = throw new UnsupportedOperationException() - override def getInterval(ordinal: Int): CalendarInterval = - throw new UnsupportedOperationException() - override def getStruct(ordinal: Int, numFields: Int): InternalRow = - throw new UnsupportedOperationException() - override def getArray(ordinal: Int): ArrayData = throw new UnsupportedOperationException() - override def getMap(ordinal: Int): MapData = throw new UnsupportedOperationException() - override def get(ordinal: Int, dataType: DataType): AnyRef = - throw new UnsupportedOperationException() - } - private case class ColumnarToFakeRowLogicAdaptor(child: LogicalPlan) extends OrderPreservingUnaryNode { override def output: Seq[Attribute] = child.output diff --git a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala index c7f623ba9..e0a4a551e 100644 --- a/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala +++ b/arrow-data-source/standard/src/main/scala/com/intel/oap/spark/sql/execution/datasources/arrow/ArrowFileFormat.scala @@ -22,8 +22,8 @@ import java.net.URLDecoder import scala.collection.JavaConverters._ import scala.collection.mutable -import com.intel.oap.spark.sql.ArrowWriteExtension.FakeRow import com.intel.oap.spark.sql.ArrowWriteQueue +import com.intel.oap.spark.sql.FakeRow import com.intel.oap.spark.sql.execution.datasources.v2.arrow.{ArrowFilters, ArrowOptions, ArrowUtils} import com.intel.oap.spark.sql.execution.datasources.v2.arrow.ArrowSQLConf._ import com.intel.oap.vectorized.ArrowWritableColumnVector diff --git a/shims/spark311/src/main/scala/org/apache/spark/sql/execution/datasources/BaseWriteStatsTracker.scala b/shims/spark311/src/main/scala/org/apache/spark/sql/execution/datasources/BaseWriteStatsTracker.scala new file mode 100644 index 000000000..e79e661e9 --- /dev/null +++ b/shims/spark311/src/main/scala/org/apache/spark/sql/execution/datasources/BaseWriteStatsTracker.scala @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.FileNotFoundException + +import scala.collection.mutable + +import com.intel.oap.spark.sql.FakeRow +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.util.SerializableConfiguration + + +/** + * Simple metrics collected during an instance of [[FileFormatDataWriter]]. + * These were first introduced in https://github.com/apache/spark/pull/18159 (SPARK-20703). + * + * We changed the newRow method to support write with FakeRow + */ +case class BasicWriteTaskStats( + partitions: Seq[InternalRow], + numFiles: Int, + numBytes: Long, + numRows: Long) + extends WriteTaskStats + + +/** + * Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]]. + */ +class BasicWriteTaskStatsTracker(hadoopConf: Configuration) + extends WriteTaskStatsTracker with Logging { + + private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty + private[this] var numFiles: Int = 0 + private[this] var submittedFiles: Int = 0 + private[this] var numBytes: Long = 0L + private[this] var numRows: Long = 0L + + private[this] var curFile: Option[String] = None + + /** + * Get the size of the file expected to have been written by a worker. + * @param filePath path to the file + * @return the file size or None if the file was not found. + */ + private def getFileSize(filePath: String): Option[Long] = { + val path = new Path(filePath) + val fs = path.getFileSystem(hadoopConf) + try { + Some(fs.getFileStatus(path).getLen()) + } catch { + case e: FileNotFoundException => + // may arise against eventually consistent object stores + logDebug(s"File $path is not yet visible", e) + None + } + } + + + override def newPartition(partitionValues: InternalRow): Unit = { + partitions.append(partitionValues) + } + + override def newBucket(bucketId: Int): Unit = { + // currently unhandled + } + + override def newFile(filePath: String): Unit = { + statCurrentFile() + curFile = Some(filePath) + submittedFiles += 1 + } + + private def statCurrentFile(): Unit = { + curFile.foreach { path => + getFileSize(path).foreach { len => + numBytes += len + numFiles += 1 + } + curFile = None + } + } + + override def newRow(row: InternalRow): Unit = row match { + case fake: FakeRow => + numRows += fake.batch.numRows() + case _ => + numRows += 1 + } + + override def getFinalStats(): WriteTaskStats = { + statCurrentFile() + + // Reports bytesWritten and recordsWritten to the Spark output metrics. + Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics => + outputMetrics.setBytesWritten(numBytes) + outputMetrics.setRecordsWritten(numRows) + } + + if (submittedFiles != numFiles) { + logInfo(s"Expected $submittedFiles files, but only saw $numFiles. " + + "This could be due to the output format not writing empty files, " + + "or files being not immediately visible in the filesystem.") + } + BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows) + } +} + + +/** + * Simple [[WriteJobStatsTracker]] implementation that's serializable, capable of + * instantiating [[BasicWriteTaskStatsTracker]] on executors and processing the + * [[BasicWriteTaskStats]] they produce by aggregating the metrics and posting them + * as DriverMetricUpdates. + */ +class BasicWriteJobStatsTracker( + serializableHadoopConf: SerializableConfiguration, + @transient val metrics: Map[String, SQLMetric]) + extends WriteJobStatsTracker { + + override def newTaskInstance(): WriteTaskStatsTracker = { + new BasicWriteTaskStatsTracker(serializableHadoopConf.value) + } + + override def processStats(stats: Seq[WriteTaskStats]): Unit = { + val sparkContext = SparkContext.getActive.get + var partitionsSet: mutable.Set[InternalRow] = mutable.HashSet.empty + var numFiles: Long = 0L + var totalNumBytes: Long = 0L + var totalNumOutput: Long = 0L + + val basicStats = stats.map(_.asInstanceOf[BasicWriteTaskStats]) + + basicStats.foreach { summary => + partitionsSet ++= summary.partitions + numFiles += summary.numFiles + totalNumBytes += summary.numBytes + totalNumOutput += summary.numRows + } + + metrics(BasicWriteJobStatsTracker.NUM_FILES_KEY).add(numFiles) + metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_BYTES_KEY).add(totalNumBytes) + metrics(BasicWriteJobStatsTracker.NUM_OUTPUT_ROWS_KEY).add(totalNumOutput) + metrics(BasicWriteJobStatsTracker.NUM_PARTS_KEY).add(partitionsSet.size) + + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toList) + } +} + +object BasicWriteJobStatsTracker { + private val NUM_FILES_KEY = "numFiles" + private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes" + private val NUM_OUTPUT_ROWS_KEY = "numOutputRows" + private val NUM_PARTS_KEY = "numParts" + + def metrics: Map[String, SQLMetric] = { + val sparkContext = SparkContext.getActive.get + Map( + NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"), + NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"), + NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part") + ) + } +} diff --git a/shims/spark321/src/main/scala/org/apache/spark/sql/execution/datasources/BaseWriteStatsTracker.scala b/shims/spark321/src/main/scala/org/apache/spark/sql/execution/datasources/BaseWriteStatsTracker.scala new file mode 100644 index 000000000..65c9f200c --- /dev/null +++ b/shims/spark321/src/main/scala/org/apache/spark/sql/execution/datasources/BaseWriteStatsTracker.scala @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.FileNotFoundException +import java.nio.charset.StandardCharsets + +import scala.collection.mutable + +import com.intel.oap.spark.sql.FakeRow +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._ +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration + + +/** + * Simple metrics collected during an instance of [[FileFormatDataWriter]]. + * These were first introduced in https://github.com/apache/spark/pull/18159 (SPARK-20703). + * + * We changed the newRow method to support write with FakeRow + */ +case class BasicWriteTaskStats( + partitions: Seq[InternalRow], + numFiles: Int, + numBytes: Long, + numRows: Long) + extends WriteTaskStats + + +/** + * Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]]. + */ +class BasicWriteTaskStatsTracker( + hadoopConf: Configuration, + taskCommitTimeMetric: Option[SQLMetric] = None) + extends WriteTaskStatsTracker with Logging { + + private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty + private[this] var numFiles: Int = 0 + private[this] var numSubmittedFiles: Int = 0 + private[this] var numBytes: Long = 0L + private[this] var numRows: Long = 0L + + private[this] val submittedFiles = mutable.HashSet[String]() + + /** + * Get the size of the file expected to have been written by a worker. + * @param filePath path to the file + * @return the file size or None if the file was not found. + */ + private def getFileSize(filePath: String): Option[Long] = { + val path = new Path(filePath) + val fs = path.getFileSystem(hadoopConf) + getFileSize(fs, path) + } + + /** + * Get the size of the file expected to have been written by a worker. + * This supports the XAttr in HADOOP-17414 when the "magic committer" adds + * a custom HTTP header to the a zero byte marker. + * If the output file as returned by getFileStatus > 0 then the length if + * returned. For zero-byte files, the (optional) Hadoop FS API getXAttr() is + * invoked. If a parseable, non-negative length can be retrieved, this + * is returned instead of the length. + * @return the file size or None if the file was not found. + */ + private [datasources] def getFileSize(fs: FileSystem, path: Path): Option[Long] = { + // the normal file status probe. + try { + val len = fs.getFileStatus(path).getLen + if (len > 0) { + return Some(len) + } + } catch { + case e: FileNotFoundException => + // may arise against eventually consistent object stores. + logDebug(s"File $path is not yet visible", e) + return None + } + + // Output File Size is 0. Look to see if it has an attribute + // declaring a future-file-length. + // Failure of API call, parsing, invalid value all return the + // 0 byte length. + + var len = 0L + try { + val attr = fs.getXAttr(path, BasicWriteJobStatsTracker.FILE_LENGTH_XATTR) + if (attr != null && attr.nonEmpty) { + val str = new String(attr, StandardCharsets.UTF_8) + logDebug(s"File Length statistics for $path retrieved from XAttr: $str") + // a non-empty header was found. parse to a long via the java class + val l = java.lang.Long.parseLong(str) + if (l > 0) { + len = l + } else { + logDebug("Ignoring negative value in XAttr file length") + } + } + } catch { + case e: NumberFormatException => + // warn but don't dump the whole stack + logInfo(s"Failed to parse" + + s" ${BasicWriteJobStatsTracker.FILE_LENGTH_XATTR}:$e;" + + s" bytes written may be under-reported"); + case e: UnsupportedOperationException => + // this is not unusual; ignore + logDebug(s"XAttr not supported on path $path", e); + case e: Exception => + // Something else. Log at debug and continue. + logDebug(s"XAttr processing failure on $path", e); + } + Some(len) + } + + + override def newPartition(partitionValues: InternalRow): Unit = { + partitions.append(partitionValues) + } + + override def newFile(filePath: String): Unit = { + submittedFiles += filePath + numSubmittedFiles += 1 + } + + override def closeFile(filePath: String): Unit = { + updateFileStats(filePath) + submittedFiles.remove(filePath) + } + + private def updateFileStats(filePath: String): Unit = { + getFileSize(filePath).foreach { len => + numBytes += len + numFiles += 1 + } + } + + override def newRow(filePath: String, row: InternalRow): Unit = row match { + case fake: FakeRow => + numRows += fake.batch.numRows() + case _ => + numRows += 1 + } + + override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { + submittedFiles.foreach(updateFileStats) + submittedFiles.clear() + + // Reports bytesWritten and recordsWritten to the Spark output metrics. + Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics => + outputMetrics.setBytesWritten(numBytes) + outputMetrics.setRecordsWritten(numRows) + } + + if (numSubmittedFiles != numFiles) { + logInfo(s"Expected $numSubmittedFiles files, but only saw $numFiles. " + + "This could be due to the output format not writing empty files, " + + "or files being not immediately visible in the filesystem.") + } + taskCommitTimeMetric.foreach(_ += taskCommitTime) + BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows) + } +} + + +/** + * Simple [[WriteJobStatsTracker]] implementation that's serializable, capable of + * instantiating [[BasicWriteTaskStatsTracker]] on executors and processing the + * [[BasicWriteTaskStats]] they produce by aggregating the metrics and posting them + * as DriverMetricUpdates. + */ +class BasicWriteJobStatsTracker( + serializableHadoopConf: SerializableConfiguration, + @transient val driverSideMetrics: Map[String, SQLMetric], + taskCommitTimeMetric: SQLMetric) + extends WriteJobStatsTracker { + + def this( + serializableHadoopConf: SerializableConfiguration, + metrics: Map[String, SQLMetric]) = { + this(serializableHadoopConf, metrics - TASK_COMMIT_TIME, metrics(TASK_COMMIT_TIME)) + } + + override def newTaskInstance(): WriteTaskStatsTracker = { + new BasicWriteTaskStatsTracker(serializableHadoopConf.value, Some(taskCommitTimeMetric)) + } + + override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { + val sparkContext = SparkContext.getActive.get + var partitionsSet: mutable.Set[InternalRow] = mutable.HashSet.empty + var numFiles: Long = 0L + var totalNumBytes: Long = 0L + var totalNumOutput: Long = 0L + + val basicStats = stats.map(_.asInstanceOf[BasicWriteTaskStats]) + + basicStats.foreach { summary => + partitionsSet ++= summary.partitions + numFiles += summary.numFiles + totalNumBytes += summary.numBytes + totalNumOutput += summary.numRows + } + + driverSideMetrics(JOB_COMMIT_TIME).add(jobCommitTime) + driverSideMetrics(NUM_FILES_KEY).add(numFiles) + driverSideMetrics(NUM_OUTPUT_BYTES_KEY).add(totalNumBytes) + driverSideMetrics(NUM_OUTPUT_ROWS_KEY).add(totalNumOutput) + driverSideMetrics(NUM_PARTS_KEY).add(partitionsSet.size) + + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverSideMetrics.values.toList) + } +} + +object BasicWriteJobStatsTracker { + private val NUM_FILES_KEY = "numFiles" + private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes" + private val NUM_OUTPUT_ROWS_KEY = "numOutputRows" + private val NUM_PARTS_KEY = "numParts" + val TASK_COMMIT_TIME = "taskCommitTime" + val JOB_COMMIT_TIME = "jobCommitTime" + /** XAttr key of the data length header added in HADOOP-17414. */ + val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length" + + def metrics: Map[String, SQLMetric] = { + val sparkContext = SparkContext.getActive.get + Map( + NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"), + NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"), + NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part"), + TASK_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "task commit time"), + JOB_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "job commit time") + ) + } +} diff --git a/shims/spark322/src/main/scala/org/apache/spark/sql/execution/datasources/BaseWriteStatsTracker.scala b/shims/spark322/src/main/scala/org/apache/spark/sql/execution/datasources/BaseWriteStatsTracker.scala new file mode 100644 index 000000000..f3017e5a3 --- /dev/null +++ b/shims/spark322/src/main/scala/org/apache/spark/sql/execution/datasources/BaseWriteStatsTracker.scala @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import java.io.FileNotFoundException +import java.nio.charset.StandardCharsets + +import scala.collection.mutable + +import com.intel.oap.spark.sql.FakeRow +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.{SparkContext, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker._ +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.util.SerializableConfiguration + + +/** + * Simple metrics collected during an instance of [[FileFormatDataWriter]]. + * These were first introduced in https://github.com/apache/spark/pull/18159 (SPARK-20703). + * + * We changed the newRow method to support write with FakeRow + */ +case class BasicWriteTaskStats( + partitions: Seq[InternalRow], + numFiles: Int, + numBytes: Long, + numRows: Long) + extends WriteTaskStats + + +/** + * Simple [[WriteTaskStatsTracker]] implementation that produces [[BasicWriteTaskStats]]. + */ +class BasicWriteTaskStatsTracker( + hadoopConf: Configuration, + taskCommitTimeMetric: Option[SQLMetric] = None) + extends WriteTaskStatsTracker with Logging { + + private[this] val partitions: mutable.ArrayBuffer[InternalRow] = mutable.ArrayBuffer.empty + private[this] var numFiles: Int = 0 + private[this] var numSubmittedFiles: Int = 0 + private[this] var numBytes: Long = 0L + private[this] var numRows: Long = 0L + + private[this] val submittedFiles = mutable.HashSet[String]() + + /** + * Get the size of the file expected to have been written by a worker. + * @param filePath path to the file + * @return the file size or None if the file was not found. + */ + private def getFileSize(filePath: String): Option[Long] = { + val path = new Path(filePath) + val fs = path.getFileSystem(hadoopConf) + getFileSize(fs, path) + } + + /** + * Get the size of the file expected to have been written by a worker. + * This supports the XAttr in HADOOP-17414 when the "magic committer" adds + * a custom HTTP header to the a zero byte marker. + * If the output file as returned by getFileStatus > 0 then the length if + * returned. For zero-byte files, the (optional) Hadoop FS API getXAttr() is + * invoked. If a parseable, non-negative length can be retrieved, this + * is returned instead of the length. + * @return the file size or None if the file was not found. + */ + private [datasources] def getFileSize(fs: FileSystem, path: Path): Option[Long] = { + // the normal file status probe. + try { + val len = fs.getFileStatus(path).getLen + if (len > 0) { + return Some(len) + } + } catch { + case e: FileNotFoundException => + // may arise against eventually consistent object stores. + logDebug(s"File $path is not yet visible", e) + return None + } + + // Output File Size is 0. Look to see if it has an attribute + // declaring a future-file-length. + // Failure of API call, parsing, invalid value all return the + // 0 byte length. + + var len = 0L + try { + val attr = fs.getXAttr(path, BasicWriteJobStatsTracker.FILE_LENGTH_XATTR) + if (attr != null && attr.nonEmpty) { + val str = new String(attr, StandardCharsets.UTF_8) + logDebug(s"File Length statistics for $path retrieved from XAttr: $str") + // a non-empty header was found. parse to a long via the java class + val l = java.lang.Long.parseLong(str) + if (l > 0) { + len = l + } else { + logDebug("Ignoring negative value in XAttr file length") + } + } + } catch { + case e: NumberFormatException => + // warn but don't dump the whole stack + logInfo(s"Failed to parse" + + s" ${BasicWriteJobStatsTracker.FILE_LENGTH_XATTR}:$e;" + + s" bytes written may be under-reported"); + case e: UnsupportedOperationException => + // this is not unusual; ignore + logDebug(s"XAttr not supported on path $path", e); + case e: Exception => + // Something else. Log at debug and continue. + logDebug(s"XAttr processing failure on $path", e); + } + Some(len) + } + + + override def newPartition(partitionValues: InternalRow): Unit = { + partitions.append(partitionValues) + } + + override def newFile(filePath: String): Unit = { + submittedFiles += filePath + numSubmittedFiles += 1 + } + + override def closeFile(filePath: String): Unit = { + updateFileStats(filePath) + submittedFiles.remove(filePath) + } + + private def updateFileStats(filePath: String): Unit = { + getFileSize(filePath).foreach { len => + numBytes += len + numFiles += 1 + } + } + + override def newRow(filePath: String, row: InternalRow): Unit = row match { + case fake: FakeRow => + numRows += fake.batch.numRows() + case _ => + numRows += 1 + } + + override def getFinalStats(taskCommitTime: Long): WriteTaskStats = { + submittedFiles.foreach(updateFileStats) + submittedFiles.clear() + + // Reports bytesWritten and recordsWritten to the Spark output metrics. + Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics => + outputMetrics.setBytesWritten(numBytes) + outputMetrics.setRecordsWritten(numRows) + } + + if (numSubmittedFiles != numFiles) { + logInfo(s"Expected $numSubmittedFiles files, but only saw $numFiles. " + + "This could be due to the output format not writing empty files, " + + "or files being not immediately visible in the filesystem.") + } + taskCommitTimeMetric.foreach(_ += taskCommitTime) + BasicWriteTaskStats(partitions.toSeq, numFiles, numBytes, numRows) + } +} + + +/** + * Simple [[WriteJobStatsTracker]] implementation that's serializable, capable of + * instantiating [[BasicWriteTaskStatsTracker]] on executors and processing the + * [[BasicWriteTaskStats]] they produce by aggregating the metrics and posting them + * as DriverMetricUpdates. + */ +class BasicWriteJobStatsTracker( + serializableHadoopConf: SerializableConfiguration, + @transient val driverSideMetrics: Map[String, SQLMetric], + taskCommitTimeMetric: SQLMetric) + extends WriteJobStatsTracker { + + def this( + serializableHadoopConf: SerializableConfiguration, + metrics: Map[String, SQLMetric]) = { + this(serializableHadoopConf, metrics - TASK_COMMIT_TIME, metrics(TASK_COMMIT_TIME)) + } + + override def newTaskInstance(): WriteTaskStatsTracker = { + new BasicWriteTaskStatsTracker(serializableHadoopConf.value, Some(taskCommitTimeMetric)) + } + + override def processStats(stats: Seq[WriteTaskStats], jobCommitTime: Long): Unit = { + val sparkContext = SparkContext.getActive.get + var partitionsSet: mutable.Set[InternalRow] = mutable.HashSet.empty + var numFiles: Long = 0L + var totalNumBytes: Long = 0L + var totalNumOutput: Long = 0L + + val basicStats = stats.map(_.asInstanceOf[BasicWriteTaskStats]) + + basicStats.foreach { summary => + partitionsSet ++= summary.partitions + numFiles += summary.numFiles + totalNumBytes += summary.numBytes + totalNumOutput += summary.numRows + } + + driverSideMetrics(JOB_COMMIT_TIME).add(jobCommitTime) + driverSideMetrics(NUM_FILES_KEY).add(numFiles) + driverSideMetrics(NUM_OUTPUT_BYTES_KEY).add(totalNumBytes) + driverSideMetrics(NUM_OUTPUT_ROWS_KEY).add(totalNumOutput) + driverSideMetrics(NUM_PARTS_KEY).add(partitionsSet.size) + + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverSideMetrics.values.toList) + } +} + +object BasicWriteJobStatsTracker { + private val NUM_FILES_KEY = "numFiles" + private val NUM_OUTPUT_BYTES_KEY = "numOutputBytes" + private val NUM_OUTPUT_ROWS_KEY = "numOutputRows" + private val NUM_PARTS_KEY = "numParts" + val TASK_COMMIT_TIME = "taskCommitTime" + val JOB_COMMIT_TIME = "jobCommitTime" + /** XAttr key of the data length header added in HADOOP-17414. */ + val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length" + + def metrics: Map[String, SQLMetric] = { + val sparkContext = SparkContext.getActive.get + Map( + NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"), + NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"), + NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part"), + TASK_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "task commit time"), + JOB_COMMIT_TIME -> SQLMetrics.createTimingMetric(sparkContext, "job commit time") + ) + } +}