Skip to content

Commit

Permalink
#522 Metadata copying routine is not needed for the fix after all.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Aug 21, 2024
1 parent ce475b4 commit c0bc312
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,77 +517,6 @@ object SparkUtils {
output.toString()
}

/**
* Copies metadata from one schema to another as long as names and data types are the same.
*
* @param schemaFrom Schema to copy metadata from.
* @param schemaTo Schema to copy metadata to.
* @param overwrite If true, the metadata of schemaTo is not retained
* @param sourcePreferred If true, schemaFrom metadata is used on conflicts, schemaTo otherwise.
* @return Same schema as schemaTo with metadata from schemaFrom.
*/
def copyMetadata(schemaFrom: StructType,
schemaTo: StructType,
overwrite: Boolean = false,
sourcePreferred: Boolean = false): StructType = {
def joinMetadata(from: Metadata, to: Metadata): Metadata = {
val newMetadataMerged = new MetadataBuilder

if (sourcePreferred) {
newMetadataMerged.withMetadata(to)
newMetadataMerged.withMetadata(from)
} else {
newMetadataMerged.withMetadata(from)
newMetadataMerged.withMetadata(to)
}

newMetadataMerged.build()
}

@tailrec
def processArray(ar: ArrayType, fieldFrom: StructField, fieldTo: StructField): ArrayType = {
ar.elementType match {
case st: StructType if fieldFrom.dataType.isInstanceOf[ArrayType] && fieldFrom.dataType.asInstanceOf[ArrayType].elementType.isInstanceOf[StructType] =>
val innerStructFrom = fieldFrom.dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType]
val newDataType = StructType(copyMetadata(innerStructFrom, st).fields)
ArrayType(newDataType, ar.containsNull)
case at: ArrayType =>
processArray(at, fieldFrom, fieldTo)
case p =>
ArrayType(p, ar.containsNull)
}
}

val fieldsMap = schemaFrom.fields.map(f => (f.name, f)).toMap

val newFields: Array[StructField] = schemaTo.fields.map { fieldTo =>
fieldsMap.get(fieldTo.name) match {
case Some(fieldFrom) =>
val newMetadata = if (overwrite) {
fieldFrom.metadata
} else {
joinMetadata(fieldFrom.metadata, fieldTo.metadata)
}

fieldTo.dataType match {
case st: StructType if fieldFrom.dataType.isInstanceOf[StructType] =>
val newDataType = StructType(copyMetadata(fieldFrom.dataType.asInstanceOf[StructType], st).fields)
fieldTo.copy(dataType = newDataType, metadata = newMetadata)
case at: ArrayType =>
val newType = processArray(at, fieldFrom, fieldTo)
fieldTo.copy(dataType = newType, metadata = newMetadata)
case _ =>
fieldTo.copy(metadata = newMetadata)
}
case None =>
fieldTo
}
}

StructType(newFields)
}


private def getActualProcessingTimeUdf: UserDefinedFunction = {
udf((_: Long) => Instant.now().getEpochSecond)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,103 +720,4 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture
}
}

"copyMetadata" should {
"copy metadata from one schema to another when overwrite = false" in {
val df1 = List(1, 2, 3).toDF("col1")
val df2 = List(1, 2, 3).toDF("col1")

val metadata1 = new MetadataBuilder()
metadata1.putString("comment", "Test")

val metadata2 = new MetadataBuilder()
metadata2.putLong("maxLength", 120)

val schema1WithMetadata = StructType(Seq(df1.schema.fields.head.copy(metadata = metadata1.build())))
val schema2WithMetadata = StructType(Seq(df2.schema.fields.head.copy(metadata = metadata2.build())))

val df1WithMetadata = spark.createDataFrame(df2.rdd, schema1WithMetadata)

val schemaWithMetadata = SparkUtils.copyMetadata(df1WithMetadata.schema, schema2WithMetadata)

val newDf = spark.createDataFrame(df2.rdd, schemaWithMetadata)

assert(newDf.schema.fields.head.metadata.getString("comment") == "Test")
assert(newDf.schema.fields.head.metadata.getLong("maxLength") == 120)
}

"retain metadata on conflicts by default" in {
val df1 = List(1, 2, 3).toDF("col1")
val df2 = List(1, 2, 3).toDF("col1")

val metadata1 = new MetadataBuilder()
metadata1.putString("comment", "Test")
metadata1.putLong("maxLength", 100)

val metadata2 = new MetadataBuilder()
metadata2.putLong("maxLength", 120)
metadata2.putLong("newMetadata", 180)

val schema1WithMetadata = StructType(Seq(df1.schema.fields.head.copy(metadata = metadata1.build())))
val schema2WithMetadata = StructType(Seq(df2.schema.fields.head.copy(metadata = metadata2.build())))

val df1WithMetadata = spark.createDataFrame(df2.rdd, schema1WithMetadata)

val schemaWithMetadata = SparkUtils.copyMetadata(df1WithMetadata.schema, schema2WithMetadata)

val newDf = spark.createDataFrame(df2.rdd, schemaWithMetadata)

assert(newDf.schema.fields.head.metadata.getString("comment") == "Test")
assert(newDf.schema.fields.head.metadata.getLong("maxLength") == 120)
assert(newDf.schema.fields.head.metadata.getLong("newMetadata") == 180)
}

"overwrite metadata on conflicts when sourcePreferred=true" in {
val df1 = List(1, 2, 3).toDF("col1")
val df2 = List(1, 2, 3).toDF("col1")

val metadata1 = new MetadataBuilder()
metadata1.putString("comment", "Test")
metadata1.putLong("maxLength", 100)

val metadata2 = new MetadataBuilder()
metadata2.putLong("maxLength", 120)
metadata2.putLong("newMetadata", 180)

val schema1WithMetadata = StructType(Seq(df1.schema.fields.head.copy(metadata = metadata1.build())))
val schema2WithMetadata = StructType(Seq(df2.schema.fields.head.copy(metadata = metadata2.build())))

val df1WithMetadata = spark.createDataFrame(df2.rdd, schema1WithMetadata)

val schemaWithMetadata = SparkUtils.copyMetadata(df1WithMetadata.schema, schema2WithMetadata, sourcePreferred = true)

val newDf = spark.createDataFrame(df2.rdd, schemaWithMetadata)

assert(newDf.schema.fields.head.metadata.getString("comment") == "Test")
assert(newDf.schema.fields.head.metadata.getLong("maxLength") == 100)
assert(newDf.schema.fields.head.metadata.getLong("newMetadata") == 180)
}

"not retain original metadata when overwrite = true" in {
val df1 = List(1, 2, 3).toDF("col1")
val df2 = List(1, 2, 3).toDF("col1")

val metadata1 = new MetadataBuilder()
metadata1.putString("comment", "Test")

val metadata2 = new MetadataBuilder()
metadata2.putLong("maxLength", 120)

val schema1WithMetadata = StructType(Seq(df1.schema.fields.head.copy(metadata = metadata1.build())))
val schema2WithMetadata = StructType(Seq(df2.schema.fields.head.copy(metadata = metadata2.build())))

val df1WithMetadata = spark.createDataFrame(df2.rdd, schema1WithMetadata)

val schemaWithMetadata = SparkUtils.copyMetadata(df1WithMetadata.schema, schema2WithMetadata, overwrite = true)

val newDf = spark.createDataFrame(df2.rdd, schemaWithMetadata)

assert(newDf.schema.fields.head.metadata.getString("comment") == "Test")
assert(!newDf.schema.fields.head.metadata.contains("maxLength"))
}
}
}

0 comments on commit c0bc312

Please sign in to comment.