Skip to content

Commit

Permalink
Support struct evolution inside maps
Browse files Browse the repository at this point in the history
## Description

This PR resolves issue #1641 to allow automatic schema evolution in structs that are inside maps.

Assuming the target and source tables have the following schemas:
target: `id string, map map<int, struct<a: int, b: int>>`
source: `id string, map map<int, struct<a: int, b: int, c: int>>`
```
SET spark.databricks.delta.schema.autoMerge.enabled = true;

MERGE INTO target t
USING source s
ON t.id = s.id
WHEN MATCHED THEN UPDATE SET *
```
returns an analysis error today:
```
AnalysisException: cannot resolve 's.map' due to data type mismatch: cannot cast map<string,struct<a:int,b:int>> to map<string,struct<a:int,b:int,c:string>>;
```

With this change, the merge command succeeds and the target table schema evolves to include field `c` inside the map value. The same also works for map keys.

- Tests are added to `MergeIntoSuiteBase` and `MergeIntoSQLSuite` to cover struct evolution inside of maps values and keys.

## Does this PR introduce _any_ user-facing changes?
Yes, struct evolution inside of maps now succeeds instead of failing with an analysis error, see previous example.

Closes #1868

GitOrigin-RevId: 07ce2531e03c4e2fa69e8a34f33ba8d2dc3a0228
  • Loading branch information
johanl-db authored and allisonport-db committed Jul 20, 2023
1 parent 71e0a83 commit dbb2210
Show file tree
Hide file tree
Showing 2 changed files with 344 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,31 @@ trait UpdateExpressionsSupport extends CastSupport with SQLConfHelper with Analy
ArrayType(toEt, containsNull = true)
)
}
case (from: MapType, to: MapType) if !Cast.canCast(from, to) =>
// Manually convert map keys and values if the types are not compatible to allow schema
// evolution. This is slower than direct cast so we only do it when required.
def createMapConverter(convert: (Expression, Expression) => Expression): Expression = {
val keyVar = NamedLambdaVariable("keyVar", from.keyType, nullable = false)
val valueVar =
NamedLambdaVariable("valueVar", from.valueType, from.valueContainsNull)
LambdaFunction(convert(keyVar, valueVar), Seq(keyVar, valueVar))
}

var transformedKeysAndValues = fromExpression
if (from.keyType != to.keyType) {
transformedKeysAndValues =
TransformKeys(transformedKeysAndValues, createMapConverter {
(key, _) => castIfNeeded(key, to.keyType, allowStructEvolution)
})
}

if (from.valueType != to.valueType) {
transformedKeysAndValues =
TransformValues(transformedKeysAndValues, createMapConverter {
(_, value) => castIfNeeded(value, to.valueType, allowStructEvolution)
})
}
cast(transformedKeysAndValues, to)
case (from: StructType, to: StructType)
if !DataType.equalsIgnoreCaseAndNullability(from, to) && resolveStructsByName =>
// All from fields must be present in the final schema, or we'll silently lose data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3454,6 +3454,325 @@ abstract class MergeIntoSuiteBase
"""{ "key": "A", "value": [ { "a": { "y": 20, "x": [ { "c": 10, "d": 30, "e": null } ] }, "b": 2 } ] }
{ "key": "B", "value": [ { "a": { "y": 60, "x": [ { "c": 20, "d": 50, "e": null } ] }, "b": 3 } ] }""",
expectErrorWithoutEvolutionContains = "Cannot cast")

// Struct evolution inside of map values.
testNestedStructsEvolution("new source column in map struct value")(
target =
"""{ "key": "A", "map": { "key": { "a": 1 } } }
{ "key": "C", "map": { "key": { "a": 3 } } }""",
source =
"""{ "key": "A", "map": { "key": { "a": 2, "b": 2 } } }
{ "key": "B", "map": { "key": { "a": 1, "b": 2 } } }""",
targetSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType))),
sourceSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType).add("b", IntegerType))),
resultSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType).add("b", IntegerType))),
clauses = update("*") :: insert("*") :: Nil,
result =
"""{ "key": "A", "map": { "key": { "a": 2, "b": 2 } } }
{ "key": "B", "map": { "key": { "a": 1, "b": 2 } } }
{ "key": "C", "map": { "key": { "a": 3, "b": null } } }""",
expectErrorWithoutEvolutionContains = "Cannot cast")

testNestedStructsEvolution("new source column in nested map struct value")(
target =
"""{"key": "A", "map": { "key": { "innerKey": { "a": 1 } } } }
{"key": "C", "map": { "key": { "innerKey": { "a": 3 } } } }""",
source =
"""{"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }
{"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }""",
targetSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
MapType(StringType, new StructType().add("a", IntegerType)))),
sourceSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))),
resultSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))),
clauses = update("*") :: insert("*") :: Nil,
result =
"""{"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }
{"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": 3 } } } }
{"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": null } } } }""",
expectErrorWithoutEvolutionContains = "Cannot cast")

testNestedStructsEvolution("source map struct value contains less columns than target")(
target =
"""{ "key": "A", "map": { "key": { "a": 1, "b": 1 } } }
{ "key": "C", "map": { "key": { "a": 3, "b": 1 } } }""",
source =
"""{ "key": "A", "map": { "key": { "a": 2 } } }
{ "key": "B", "map": { "key": { "a": 1 } } }""",
targetSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType).add("b", IntegerType))),
sourceSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType))),
resultSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType).add("b", IntegerType))),
clauses = update("*") :: insert("*") :: Nil,
result =
"""{ "key": "A", "map": { "key": { "a": 2, "b": null } } }
{ "key": "B", "map": { "key": { "a": 1, "b": null } } }
{ "key": "C", "map": { "key": { "a": 3, "b": 1 } } }""",
expectErrorWithoutEvolutionContains = "Cannot cast")

testNestedStructsEvolution("source nested map struct value contains less columns than target")(
target =
"""{"key": "A", "map": { "key": { "innerKey": { "a": 1, "b": 1 } } } }
{"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": 1 } } } }""",
source =
"""{"key": "A", "map": { "key": { "innerKey": { "a": 2 } } } }
{"key": "B", "map": { "key": { "innerKey": { "a": 2 } } } }""",
targetSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))),
sourceSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
MapType(StringType, new StructType().add("a", IntegerType)))),
resultSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
MapType(StringType, new StructType().add("a", IntegerType).add("b", IntegerType)))),
clauses = update("*") :: insert("*") :: Nil,
result =
"""{"key": "A", "map": { "key": { "innerKey": { "a": 2, "b": null } } } }
{"key": "B", "map": { "key": { "innerKey": { "a": 2, "b": null } } } }
{"key": "C", "map": { "key": { "innerKey": { "a": 3, "b": 1 } } } }""",
expectErrorWithoutEvolutionContains = "Cannot cast")

testNestedStructsEvolution("source nested map struct value contains different type than target")(
target =
"""{"key": "A", "map": { "key": { "a": 1, "b" : 1 } } }
{"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""",
source =
"""{"key": "A", "map": { "key": { "a": 1, "b" : "2" } } }
{"key": "B", "map": { "key": { "a": 2, "b" : "2" } } }""",
targetSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType).add("b", IntegerType))),
sourceSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType).add("b", StringType))),
resultSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType).add("b", IntegerType))),
clauses = update("*") :: insert("*") :: Nil,
result =
"""{"key": "A", "map": { "key": { "a": 1, "b" : 2 } } }
{"key": "B", "map": { "key": { "a": 2, "b" : 2 } } }
{"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""",
resultWithoutEvolution =
"""{"key": "A", "map": { "key": { "a": 1, "b" : 2 } } }
{"key": "B", "map": { "key": { "a": 2, "b" : 2 } } }
{"key": "C", "map": { "key": { "a": 3, "b" : 1 } } }""")


testNestedStructsEvolution("source nested map struct value in different order")(
target =
"""{"key": "A", "map": { "key": { "a" : 1, "b" : 1 } } }
{"key": "C", "map": { "key": { "a" : 3, "b" : 1 } } }""",
source =
"""{"key": "A", "map": { "key": { "b" : 2, "a" : 1, "c" : 3 } } }
{"key": "B", "map": { "key": { "b" : 2, "a" : 2, "c" : 4 } } }""",
targetSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType).add("b", IntegerType))),
sourceSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType))),
resultSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType))),
clauses = update("*") :: insert("*") :: Nil,
result =
"""{"key": "A", "map": { "key": { "a": 1, "b" : 2, "c" : 3 } } }
{"key": "B", "map": { "key": { "a": 2, "b" : 2, "c" : 4 } } }
{"key": "C", "map": { "key": { "a": 3, "b" : 1, "c" : null } } }""",
expectErrorWithoutEvolutionContains = "Cannot cast")

testNestedStructsEvolution("source map struct value to map array value")(
target =
"""{ "key": "A", "map": { "key": [ 1, 2 ] } }
{ "key": "C", "map": { "key": [ 3, 4 ] } }""",
source =
"""{ "key": "A", "map": { "key": { "a": 2 } } }
{ "key": "B", "map": { "key": { "a": 1 } } }""",
targetSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
ArrayType(IntegerType))),
sourceSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
new StructType().add("a", IntegerType))),
clauses = update("*") :: insert("*") :: Nil,
expectErrorContains = "Failed to merge incompatible data types",
expectErrorWithoutEvolutionContains = "Cannot cast")

testNestedStructsEvolution("source struct nested in map array values contains more columns in different order")(
target =
"""{ "key": "A", "map": { "key": [ { "a": 1, "b": 2 } ] } }
{ "key": "C", "map": { "key": [ { "a": 3, "b": 4 } ] } }""",
source =
"""{ "key": "A", "map": { "key": [ { "b": 6, "c": 7, "a": 5 } ] } }
{ "key": "B", "map": { "key": [ { "b": 9, "c": 10, "a": 8 } ] } }""",
targetSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
ArrayType(
new StructType().add("a", IntegerType).add("b", IntegerType)))),
sourceSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
ArrayType(
new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType)))),
resultSchema = new StructType()
.add("key", StringType)
.add("map", MapType(
StringType,
ArrayType(
new StructType().add("a", IntegerType).add("b", IntegerType).add("c", IntegerType)))),
clauses = update("*") :: insert("*") :: Nil,
result =
"""{ "key": "A", "map": { "key": [ { "a": 5, "b": 6, "c": 7 } ] } }
{ "key": "B", "map": { "key": [ { "a": 8, "b": 9, "c": 10 } ] } }
{ "key": "C", "map": { "key": [ { "a": 3, "b": 4, "c": null } ] } }""",
expectErrorWithoutEvolutionContains = "Cannot cast")

// Struct evolution inside of map keys.
testEvolution("new source column in map struct key")(
targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
sourceData = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2)).toDF("key", "a", "b", "c", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"),
clauses = update("*") :: insert("*") :: Nil,
expected = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2), (3, 5, 6, null, 7))
.asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer)]]
.toDF("key", "a", "b", "c", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"),
expectErrorWithoutEvolutionContains = "Cannot cast"
)

testEvolution("source nested map struct key contains less columns than target")(
targetData = Seq((1, 2, 3, 4, 5), (3, 6, 7, 8, 9)).toDF("key", "a", "b", "c", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"),
sourceData = Seq((1, 10, 50, 1), (2, 20, 60, 2)).toDF("key", "a", "c", "value")
.selectExpr("key", "map(named_struct('a', a, 'c', c), value) as x"),
clauses = update("*") :: insert("*") :: Nil,
expected = Seq((1, 10, null, 50, 1), (2, 20, null, 60, 2), (3, 6, 7, 8, 9))
.asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer)]]
.toDF("key", "a", "b", "c", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), value) as x"),
expectErrorWithoutEvolutionContains = "Cannot cast"
)

testEvolution("source nested map struct key contains different type than target")(
targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
sourceData = Seq((1, 10, "30", 1), (2, 20, "40", 2)).toDF("key", "a", "b", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
clauses = update("*") :: insert("*") :: Nil,
expected = Seq((1, 10, 30, 1), (2, 20, 40, 2), (3, 5, 6, 7))
.asInstanceOf[List[(Integer, Integer, Integer, Integer)]]
.toDF("key", "a", "b", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
expectedWithoutEvolution = Seq((1, 10, 30, 1), (2, 20, 40, 2), (3, 5, 6, 7))
.asInstanceOf[List[(Integer, Integer, Integer, Integer)]]
.toDF("key", "a", "b", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x")
)

testEvolution("source nested map struct key in different order")(
targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
sourceData = Seq((1, 10, 30, 1), (2, 20, 40, 2)).toDF("key", "a", "b", "value")
.selectExpr("key", "map(named_struct('b', b, 'a', a), value) as x"),
clauses = update("*") :: insert("*") :: Nil,
expected = Seq((1, 30, 10, 1), (2, 40, 20, 2), (3, 5, 6, 7))
.asInstanceOf[List[(Integer, Integer, Integer, Integer)]]
.toDF("key", "a", "b", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x"),
expectedWithoutEvolution = Seq((1, 30, 10, 1), (2, 40, 20, 2), (3, 5, 6, 7))
.asInstanceOf[List[(Integer, Integer, Integer, Integer)]]
.toDF("key", "a", "b", "value")
.selectExpr("key", "map(named_struct('a', a, 'b', b), value) as x")
)

testEvolution(
"source struct nested in map array keys contains more columns")(
targetData = Seq((1, 2, 3, 4), (3, 5, 6, 7)).toDF("key", "a", "b", "value")
.selectExpr("key", "map(array(named_struct('a', a, 'b', b)), value) as x"),
sourceData = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2)).toDF("key", "a", "b", "c", "value")
.selectExpr("key", "map(array(named_struct('a', a, 'b', b, 'c', c)), value) as x"),
clauses = update("*") :: insert("*") :: Nil,
expected = Seq((1, 10, 30, 50, 1), (2, 20, 40, 60, 2), (3, 5, 6, null, 7))
.asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer)]]
.toDF("key", "a", "b", "c", "value")
.selectExpr("key", "map(array(named_struct('a', a, 'b', b, 'c', c)), value) as x"),
expectErrorWithoutEvolutionContains = "cannot cast"
)

testEvolution("struct evolution in both map keys and values")(
targetData = Seq((1, 2, 3, 4, 5), (3, 6, 7, 8, 9)).toDF("key", "a", "b", "d", "e")
.selectExpr("key", "map(named_struct('a', a, 'b', b), named_struct('d', d, 'e', e)) as x"),
sourceData = Seq((1, 10, 30, 50, 70, 90, 110), (2, 20, 40, 60, 80, 100, 120))
.toDF("key", "a", "b", "c", "d", "e", "f")
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), named_struct('d', d, 'e', e, 'f', f)) as x"),
clauses = update("*") :: insert("*") :: Nil,
expected = Seq((1, 10, 30, 50, 70, 90, 110), (2, 20, 40, 60, 80, 100, 120), (3, 6, 7, null, 8, 9, null))
.asInstanceOf[List[(Integer, Integer, Integer, Integer, Integer, Integer, Integer)]]
.toDF("key", "a", "b", "c", "d", "e", "f")
.selectExpr("key", "map(named_struct('a', a, 'b', b, 'c', c), named_struct('d', d, 'e', e, 'f', f)) as x"),
expectErrorWithoutEvolutionContains = "cannot cast"
)
// scalastyle:on line.size.limit

testEvolution("new column with update * and insert non-*")(
Expand Down

0 comments on commit dbb2210

Please sign in to comment.