diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/UpdateExpressionsSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/UpdateExpressionsSupport.scala index 7fc7a5eb6a..2fbdb282fe 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/UpdateExpressionsSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/UpdateExpressionsSupport.scala @@ -114,6 +114,31 @@ trait UpdateExpressionsSupport extends CastSupport with SQLConfHelper with Analy ArrayType(toEt, containsNull = true) ) } + case (from: MapType, to: MapType) if !Cast.canCast(from, to) => + // Manually convert map keys and values if the types are not compatible to allow schema + // evolution. This is slower than direct cast so we only do it when required. + def createMapConverter(convert: (Expression, Expression) => Expression): Expression = { + val keyVar = NamedLambdaVariable("keyVar", from.keyType, nullable = false) + val valueVar = + NamedLambdaVariable("valueVar", from.valueType, from.valueContainsNull) + LambdaFunction(convert(keyVar, valueVar), Seq(keyVar, valueVar)) + } + + var transformedKeysAndValues = fromExpression + if (from.keyType != to.keyType) { + transformedKeysAndValues = + TransformKeys(transformedKeysAndValues, createMapConverter { + (key, _) => castIfNeeded(key, to.keyType, allowStructEvolution) + }) + } + + if (from.valueType != to.valueType) { + transformedKeysAndValues = + TransformValues(transformedKeysAndValues, createMapConverter { + (_, value) => castIfNeeded(value, to.valueType, allowStructEvolution) + }) + } + cast(transformedKeysAndValues, to) case (from: StructType, to: StructType) if !DataType.equalsIgnoreCaseAndNullability(from, to) && resolveStructsByName => // All from fields must be present in the final schema, or we'll silently lose data. diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala index 7fa7be2258..80d7de63fe 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala @@ -3454,6 +3454,325 @@ abstract class MergeIntoSuiteBase """{ "key": "A", "value": [ { "a": { "y": 20, "x": [ { "c": 10, "d": 30, "e": null } ] }, "b": 2 } ] } { "key": "B", "value": [ { "a": { "y": 60, "x": [ { "c": 20, "d": 50, "e": null } ] }, "b": 3 } ] }""", expectErrorWithoutEvolutionContains = "Cannot cast") + + // Struct evolution inside of map values. + testNestedStructsEvolution("new source column in map struct value")( + target = + """{ "key": "A", "map": { "key": { "a": 1 } } } + { "key": "C", "map": { "key": { "a": 3 } } }""", + source = + """{ "key": "A", "map": { "key": { "a": 2, "b": 2 } } } + { "key": "B", "map": { "key": { "a": 1, "b": 2 } } }""", + targetSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType))), + sourceSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType).add("b", IntegerType))), + resultSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType).add("b", IntegerType))), + clauses = update("*") :: insert("*") :: Nil, + result = + """{ "key": "A", "map": { "key": { "a": 2, "b": 2 } } } + { "key": "B", "map": { "key": { "a": 1, "b": 2 } } } + { "key": "C", "map": { "key": { "a": 3, "b": null } } }""", + expectErrorWithoutEvolutionContains = "Cannot cast") + + testNestedStructsEvolution("new source column in nested map struct value")( + target = + """{"key": "A", "map": { "key": { "innerKey": { "a": 1 } } } } + {"key": "C", "map": { "key": { "innerKey": { "a": 3 } } } }""", + source = + """{"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } } + {"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }""", + targetSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + MapType(StringType, new StructType().add("a", IntegerType)))), + sourceSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))), + resultSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))), + clauses = update("*") :: insert("*") :: Nil, + result = + """{"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } } + {"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } } + {"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": null } } } }""", + expectErrorWithoutEvolutionContains = "Cannot cast") + + testNestedStructsEvolution("source map struct value contains less columns than target")( + target = + """{ "key": "A", "map": { "key": { "a": 1, "b": 1 } } } + { "key": "C", "map": { "key": { "a": 3, "b": 1 } } }""", + source = + """{ "key": "A", "map": { "key": { "a": 2 } } } + { "key": "B", "map": { "key": { "a": 1 } } }""", + targetSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType).add("b", IntegerType))), + sourceSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType))), + resultSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType).add("b", IntegerType))), + clauses = update("*") :: insert("*") :: Nil, + result = + """{ "key": "A", "map": { "key": { "a": 2, "b": null } } } + { "key": "B", "map": { "key": { "a": 1, "b": null } } } + { "key": "C", "map": { "key": { "a": 3, "b": 1 } } }""", + expectErrorWithoutEvolutionContains = "Cannot cast") + + testNestedStructsEvolution("source nested map struct value contains less columns than target")( + target = + """{"key": "A", "map": { "key": { "innerKey": { "a": 1, "b": 1 } } } } + {"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": 1 } } } }""", + source = + """{"key": "A", "map": { "key": { "innerKey": { "a": 2 } } } } + {"key": "B", "map": { "key": { "innerKey": { "a": 2 } } } }""", + targetSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))), + sourceSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + MapType(StringType, new StructType().add("a", IntegerType)))), + resultSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))), + clauses = update("*") :: insert("*") :: Nil, + result = + """{"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": null } } } } + {"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": null } } } } + {"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": 1 } } } }""", + expectErrorWithoutEvolutionContains = "Cannot cast") + + testNestedStructsEvolution("source nested map struct value contains different type than target")( + target = + """{"key": "A", "map": { "key": { "a": 1, "b" : 1 } } } + {"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""", + source = + """{"key": "A", "map": { "key": { "a": 1, "b" : "2" } } } + {"key": "B", "map": { "key": { "a": 2, "b" : "2" } } }""", + targetSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType).add("b", IntegerType))), + sourceSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType).add("b", StringType))), + resultSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType).add("b", IntegerType))), + clauses = update("*") :: insert("*") :: Nil, + result = + """{"key": "A", "map": { "key": { "a": 1, "b" : 2 } } } + {"key": "B", "map": { "key": { "a": 2, "b" : 2 } } } + {"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""", + resultWithoutEvolution = + """{"key": "A", "map": { "key": { "a": 1, "b" : 2 } } } + {"key": "B", "map": { "key": { "a": 2, "b" : 2 } } } + {"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""") + + + testNestedStructsEvolution("source nested map struct value in different order")( + target = + """{"key": "A", "map": { "key": { "a" : 1, "b" : 1 } } } + {"key": "C", "map": { "key": { "a" : 3, "b" : 1 } } }""", + source = + """{"key": "A", "map": { "key": { "b" : 2, "a" : 1, "c" : 3 } } } + {"key": "B", "map": { "key": { "b" : 2, "a" : 2, "c" : 4 } } }""", + targetSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType).add("b", IntegerType))), + sourceSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType))), + resultSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType))), + clauses = update("*") :: insert("*") :: Nil, + result = + """{"key": "A", "map": { "key": { "a": 1, "b" : 2, "c" : 3 } } } + {"key": "B", "map": { "key": { "a": 2, "b" : 2, "c" : 4 } } } + {"key": "C", "map": { "key": { "a": 3, "b" : 1, "c" : null } } }""", + expectErrorWithoutEvolutionContains = "Cannot cast") + + testNestedStructsEvolution("source map struct value to map array value")( + target = + """{ "key": "A", "map": { "key": [ 1, 2 ] } } + { "key": "C", "map": { "key": [ 3, 4 ] } }""", + source = + """{ "key": "A", "map": { "key": { "a": 2 } } } + { "key": "B", "map": { "key": { "a": 1 } } }""", + targetSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + ArrayType(IntegerType))), + sourceSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + new StructType().add("a", IntegerType))), + clauses = update("*") :: insert("*") :: Nil, + expectErrorContains = "Failed to merge incompatible data types", + expectErrorWithoutEvolutionContains = "Cannot cast") + + testNestedStructsEvolution("source struct nested in map array values contains more columns in different order")( + target = + """{ "key": "A", "map": { "key": [ { "a": 1, "b": 2 } ] } } + { "key": "C", "map": { "key": [ { "a": 3, "b": 4 } ] } }""", + source = + """{ "key": "A", "map": { "key": [ { "b": 6, "c": 7, "a": 5 } ] } } + { "key": "B", "map": { "key": [ { "b": 9, "c": 10, "a": 8 } ] } }""", + targetSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + ArrayType( + new StructType().add("a", IntegerType).add("b", IntegerType)))), + sourceSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + ArrayType( + new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType)))), + resultSchema = new StructType() + .add("key", StringType) + .add("map", MapType( + StringType, + ArrayType( + new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType)))), + clauses = update("*") :: insert("*") :: Nil, + result = + """{ "key": "A", "map": { "key": [ { "a": 5, "b": 6, "c": 7 } ] } } + { "key": "B", "map": { "key": [ { "a": 8, "b": 9, "c": 10 } ] } } + { "key": "C", "map": { "key": [ { "a": 3, "b": 4, "c": null } ] } }""", + expectErrorWithoutEvolutionContains = "Cannot cast") + + // Struct evolution inside of map keys. + testEvolution("new source column in map struct key")( + targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"), + sourceData = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2)).toDF("key", "a", "b", "c", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"), + clauses = update("*") :: insert("*") :: Nil, + expected = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2), (3, 5, 6, null, 7)) + .asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer)]] + .toDF("key", "a", "b", "c", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"), + expectErrorWithoutEvolutionContains = "Cannot cast" + ) + + testEvolution("source nested map struct key contains less columns than target")( + targetData = Seq((1, 2, 3, 4, 5), (3, 6, 7, 8, 9)).toDF("key", "a", "b", "c", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"), + sourceData = Seq((1, 10, 50, 1), (2, 20, 60, 2)).toDF("key", "a", "c", "value") + .selectExpr("key", "map(named_struct('a', a, 'c', c), value) as x"), + clauses = update("*") :: insert("*") :: Nil, + expected = Seq((1, 10, null, 50, 1), (2, 20, null, 60, 2), (3, 6, 7, 8, 9)) + .asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer)]] + .toDF("key", "a", "b", "c", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"), + expectErrorWithoutEvolutionContains = "Cannot cast" + ) + + testEvolution("source nested map struct key contains different type than target")( + targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"), + sourceData = Seq((1, 10, "30", 1), (2, 20, "40", 2)).toDF("key", "a", "b", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"), + clauses = update("*") :: insert("*") :: Nil, + expected = Seq((1, 10, 30, 1), (2, 20, 40, 2), (3, 5, 6, 7)) + .asInstanceOf[List[(Integer, Integer, Integer, Integer)]] + .toDF("key", "a", "b", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"), + expectedWithoutEvolution = Seq((1, 10, 30, 1), (2, 20, 40, 2), (3, 5, 6, 7)) + .asInstanceOf[List[(Integer, Integer, Integer, Integer)]] + .toDF("key", "a", "b", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x") + ) + + testEvolution("source nested map struct key in different order")( + targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"), + sourceData = Seq((1, 10, 30, 1), (2, 20, 40, 2)).toDF("key", "a", "b", "value") + .selectExpr("key", "map(named_struct('b', b, 'a', a), value) as x"), + clauses = update("*") :: insert("*") :: Nil, + expected = Seq((1, 30, 10, 1), (2, 40, 20, 2), (3, 5, 6, 7)) + .asInstanceOf[List[(Integer, Integer, Integer, Integer)]] + .toDF("key", "a", "b", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"), + expectedWithoutEvolution = Seq((1, 30, 10, 1), (2, 40, 20, 2), (3, 5, 6, 7)) + .asInstanceOf[List[(Integer, Integer, Integer, Integer)]] + .toDF("key", "a", "b", "value") + .selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x") + ) + + testEvolution( + "source struct nested in map array keys contains more columns")( + targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value") + .selectExpr("key", "map(array(named_struct('a', a, 'b', b)), value) as x"), + sourceData = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2)).toDF("key", "a", "b", "c", "value") + .selectExpr("key", "map(array(named_struct('a', a, 'b', b, 'c', c)), value) as x"), + clauses = update("*") :: insert("*") :: Nil, + expected = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2), (3, 5, 6, null, 7)) + .asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer)]] + .toDF("key", "a", "b", "c", "value") + .selectExpr("key", "map(array(named_struct('a', a, 'b', b, 'c', c)), value) as x"), + expectErrorWithoutEvolutionContains = "cannot cast" + ) + + testEvolution("struct evolution in both map keys and values")( + targetData = Seq((1, 2, 3, 4, 5), (3, 6, 7, 8, 9)).toDF("key", "a", "b", "d", "e") + .selectExpr("key", "map(named_struct('a', a, 'b', b), named_struct('d', d, 'e', e)) as x"), + sourceData = Seq((1, 10, 30, 50, 70, 90, 110), (2, 20, 40, 60, 80, 100, 120)) + .toDF("key", "a", "b", "c", "d", "e", "f") + .selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), named_struct('d', d, 'e', e, 'f', f)) as x"), + clauses = update("*") :: insert("*") :: Nil, + expected = Seq((1, 10, 30, 50, 70, 90, 110), (2, 20, 40, 60, 80, 100, 120), (3, 6, 7, null, 8, 9, null)) + .asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer, Integer, Integer)]] + .toDF("key", "a", "b", "c", "d", "e", "f") + .selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), named_struct('d', d, 'e', e, 'f', f)) as x"), + expectErrorWithoutEvolutionContains = "cannot cast" + ) // scalastyle:on line.size.limit testEvolution("new column with update * and insert non-*")(