Skip to content

Commit

Permalink
upmerge
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Jun 18, 2024
2 parents ae23ac8 + d584229 commit dfb8800
Show file tree
Hide file tree
Showing 46 changed files with 170 additions and 506 deletions.
13 changes: 1 addition & 12 deletions .github/workflows/pr_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,6 @@ jobs:
test-target: [java]
spark-version: ['3.3', '3.4']
scala-version: ['2.12', '2.13']
exclude:
- java_version: 17
spark-version: '3.2'
- java_version: 11
spark-version: '3.2'
- spark-version: '3.2'
scala-version: '2.13'
fail-fast: false
name: ${{ matrix.os }}/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ matrix.test-target }}
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -254,15 +247,11 @@ jobs:
matrix:
java_version: [8, 17]
test-target: [java]
spark-version: ['3.2', '3.3']
spark-version: ['3.3']
scala-version: ['2.12', '2.13']
exclude:
- java_version: 17
spark-version: '3.2'
- java_version: 8
spark-version: '3.3'
- spark-version: '3.2'
scala-version: '2.13'
fail-fast: false
name: macos-14(Silicon)/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ matrix.test-target }}
runs-on: macos-14
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
package org.apache.comet.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.comet.shims.ShimCometParquetUtils
import org.apache.spark.sql.internal.SQLConf

object CometParquetUtils extends ShimCometParquetUtils {
object CometParquetUtils {
private val PARQUET_FIELD_ID_WRITE_ENABLED = "spark.sql.parquet.fieldId.write.enabled"
private val PARQUET_FIELD_ID_READ_ENABLED = "spark.sql.parquet.fieldId.read.enabled"
private val IGNORE_MISSING_PARQUET_FIELD_ID = "spark.sql.parquet.fieldId.read.ignoreMissing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ import org.apache.parquet.schema._
import org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation
import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.types._

import org.apache.comet.parquet.CometParquetUtils

/**
* This class is copied & slightly modified from [[ParquetReadSupport]] in Spark. Changes:
* - This doesn't extend from Parquet's `ReadSupport` class since that is used for row-based
Expand All @@ -53,7 +52,7 @@ object CometParquetReadSupport {
ignoreMissingIds: Boolean): MessageType = {
if (!ignoreMissingIds &&
!containsFieldIds(parquetSchema) &&
CometParquetUtils.hasFieldIds(catalystSchema)) {
ParquetUtils.hasFieldIds(catalystSchema)) {
throw new RuntimeException(
"Spark read schema expects field Ids, " +
"but Parquet file schema doesn't contain any field Ids.\n" +
Expand Down Expand Up @@ -334,14 +333,14 @@ object CometParquetReadSupport {
}

def matchIdField(f: StructField): Type = {
val fieldId = CometParquetUtils.getFieldId(f)
val fieldId = ParquetUtils.getFieldId(f)
idToParquetFieldMap
.get(fieldId)
.map { parquetTypes =>
if (parquetTypes.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is matched
val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
throw CometParquetUtils.foundDuplicateFieldInFieldIdLookupModeError(
throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
fieldId,
parquetTypesString)
} else {
Expand All @@ -355,9 +354,9 @@ object CometParquetReadSupport {
}
}

val shouldMatchById = useFieldId && CometParquetUtils.hasFieldIds(structType)
val shouldMatchById = useFieldId && ParquetUtils.hasFieldIds(structType)
structType.map { f =>
if (shouldMatchById && CometParquetUtils.hasFieldId(f)) {
if (shouldMatchById && ParquetUtils.hasFieldId(f)) {
matchIdField(f)
} else if (caseSensitive) {
matchCaseSensitiveField(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type.Repetition._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -66,8 +67,8 @@ class CometSparkToParquetSchemaConverter(
*/
def convertField(field: StructField): Type = {
val converted = convertField(field, if (field.nullable) OPTIONAL else REQUIRED)
if (useFieldId && CometParquetUtils.hasFieldId(field)) {
converted.withId(CometParquetUtils.getFieldId(field))
if (useFieldId && ParquetUtils.hasFieldId(field)) {
converted.withId(ParquetUtils.getFieldId(field))
} else {
converted
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import org.apache.spark.sql.execution.datasources.PartitionedFile

object ShimBatchReader {

<<<<<<< HEAD
// TODO: create specific shim per Spark version rather than use reflection
=======
// TODO: remove after dropping Spark 3.3 support and directly call PartitionedFile
>>>>>>> apache/main
def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
classOf[PartitionedFile].getDeclaredConstructors
.map(c =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@ package org.apache.comet.shims

object ShimFileFormat {

// TODO: remove after dropping Spark 3.2 & 3.3 support and directly use FileFormat.ROW_INDEX
// TODO: remove after dropping Spark 3.3 support and directly use FileFormat.ROW_INDEX
val ROW_INDEX = "row_index"

// A name for a temporary column that holds row indexes computed by the file format reader
// until they can be placed in the _metadata struct.
// TODO: remove after dropping Spark 3.2 & 3.3 support and directly use
// TODO: remove after dropping Spark 3.3 support and directly use
// FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
val ROW_INDEX_TEMPORARY_COLUMN_NAME: String = s"_tmp_metadata_$ROW_INDEX"

// TODO: remove after dropping Spark 3.2 support and use FileFormat.OPTION_RETURNING_BATCH
val OPTION_RETURNING_BATCH = "returning_batch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.Try
import org.apache.spark.sql.types.{StructField, StructType}

object ShimResolveDefaultColumns {
// TODO: remove after dropping Spark 3.2 & 3.3 support and directly use ResolveDefaultColumns
// TODO: remove after dropping Spark 3.3 support and directly use ResolveDefaultColumns
def getExistenceDefaultValue(field: StructField): Any =
Try {
// scalastyle:off classforname
Expand Down

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub enum CometError {
Internal(String),

// Note that this message format is based on Spark 3.4 and is more detailed than the message
// returned by Spark 3.2 or 3.3
// returned by Spark 3.3
#[error("[CAST_INVALID_INPUT] The value '{value}' of the type \"{from_type}\" cannot be cast to \"{to_type}\" \
because it is malformed. Correct the value as per the syntax, or change its target type. \
Use `try_cast` to tolerate malformed input and return NULL instead. If necessary \
Expand Down
31 changes: 30 additions & 1 deletion core/src/execution/datafusion/expressions/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,7 @@ impl Cast {

fn cast_array(&self, array: ArrayRef) -> DataFusionResult<ArrayRef> {
let to_type = &self.data_type;
let array = array_with_timezone(array, self.timezone.clone(), Some(to_type));
let array = array_with_timezone(array, self.timezone.clone(), Some(to_type))?;
let from_type = array.data_type().clone();

// unpack dictionary string arrays first
Expand Down Expand Up @@ -1641,6 +1641,8 @@ mod tests {
use arrow_array::StringArray;
use arrow_schema::TimeUnit;

use datafusion_physical_expr::expressions::Column;

use super::*;

#[test]
Expand Down Expand Up @@ -1897,4 +1899,31 @@ mod tests {
assert!(cast_string_to_i8("0.2", EvalMode::Ansi).is_err());
assert!(cast_string_to_i8(".", EvalMode::Ansi).is_err());
}

#[test]
fn test_cast_unsupported_timestamp_to_date() {
// Since datafusion uses chrono::Datetime internally not all dates representable by TimestampMicrosecondType are supported
let timestamps: PrimitiveArray<TimestampMicrosecondType> = vec![i64::MAX].into();
let cast = Cast::new(
Arc::new(Column::new("a", 0)),
DataType::Date32,
EvalMode::Legacy,
"UTC".to_owned(),
);
let result = cast.cast_array(Arc::new(timestamps.with_timezone("Europe/Copenhagen")));
assert!(result.is_err())
}

#[test]
fn test_cast_invalid_timezone() {
let timestamps: PrimitiveArray<TimestampMicrosecondType> = vec![i64::MAX].into();
let cast = Cast::new(
Arc::new(Column::new("a", 0)),
DataType::Date32,
EvalMode::Legacy,
"Not a valid timezone".to_owned(),
);
let result = cast.cast_array(Arc::new(timestamps.with_timezone("Europe/Copenhagen")));
assert!(result.is_err())
}
}
10 changes: 5 additions & 5 deletions core/src/execution/datafusion/expressions/temporal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ impl PhysicalExpr for HourExec {
Microsecond,
Some(self.timezone.clone().into()),
)),
);
)?;
let result = date_part(&array, DatePart::Hour)?;

Ok(ColumnarValue::Array(result))
Expand Down Expand Up @@ -194,7 +194,7 @@ impl PhysicalExpr for MinuteExec {
Microsecond,
Some(self.timezone.clone().into()),
)),
);
)?;
let result = date_part(&array, DatePart::Minute)?;

Ok(ColumnarValue::Array(result))
Expand Down Expand Up @@ -288,7 +288,7 @@ impl PhysicalExpr for SecondExec {
Microsecond,
Some(self.timezone.clone().into()),
)),
);
)?;
let result = date_part(&array, DatePart::Second)?;

Ok(ColumnarValue::Array(result))
Expand Down Expand Up @@ -489,7 +489,7 @@ impl PhysicalExpr for TimestampTruncExec {
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
);
)?;
let result = timestamp_trunc_dyn(&ts, format)?;
Ok(ColumnarValue::Array(result))
}
Expand All @@ -498,7 +498,7 @@ impl PhysicalExpr for TimestampTruncExec {
ts,
tz.clone(),
Some(&DataType::Timestamp(Microsecond, Some(tz.into()))),
);
)?;
let result = timestamp_trunc_array_fmt_dyn(&ts, &formats)?;
Ok(ColumnarValue::Array(result))
}
Expand Down
Loading

0 comments on commit dfb8800

Please sign in to comment.