diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingTestSparkUtils.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingTestSparkUtils.scala index 8f963c83c8..18b2396a4d 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingTestSparkUtils.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingTestSparkUtils.scala @@ -85,7 +85,10 @@ trait DeltaSharingTestSparkUtils extends DeltaSQLTestUtils { protected def createSimpleTable(tableName: String, enableCdf: Boolean): Unit = { val tablePropertiesStr = if (enableCdf) { - "TBLPROPERTIES (delta.enableChangeDataFeed = true)" + """TBLPROPERTIES ( + |delta.minReaderVersion=1, + |delta.minWriterVersion=4, + |delta.enableChangeDataFeed = true)""".stripMargin } else { "" } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index ada9854f4b..d4df3128d0 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -322,12 +322,12 @@ object Protocol { * * This function returns the protocol versions and features individually instead of a * [[Protocol]], so the caller can identify the features that caused the protocol version. For - * example, if the return values are (2, 5, columnMapping), the caller can safely ignore all - * other features required by the protocol with a reader and writer version of 2 and 5. + * example, if the return values are (2, 5, columnMapping + preceding features), the caller + * can safely ignore all other features required by the protocol with a reader and writer + * version of 2 and 5. * - * Note that this method does not consider protocol versions and features configured in session - * defaults. To make them effective, copy them to `metadata` using - * [[DeltaConfigs.mergeGlobalConfigs]]. + * Note that this method does not consider features configured in session defaults. + * To make them effective, copy them to `metadata` using [[DeltaConfigs.mergeGlobalConfigs]]. */ def minProtocolComponentsFromMetadata( spark: SparkSession, @@ -343,46 +343,11 @@ object Protocol { spark, metadata, Protocol().withFeatures(tablePropEnabledFeatures)) val allEnabledFeatures = tablePropEnabledFeatures ++ metaEnabledFeatures - // Determine the min reader and writer version required by features in table properties or - // metadata. - // If any table property is specified: - // we start from (3, 7) or (0, 7) depending on the existence of any writer-only feature. - // If there's no table property: - // if no feature is enabled or all features are legacy, we start from (0, 0); - // if any feature is native and is reader-writer, we start from (3, 7); - // otherwise we start from (0, 7) because there must exist a native writer-only feature. - var (readerVersionFromFeatures, writerVersionFromFeatures) = { - if (tablePropEnabledFeatures.exists(_.isReaderWriterFeature)) { - (TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - } else if (tablePropEnabledFeatures.nonEmpty) { - (0, TABLE_FEATURES_MIN_WRITER_VERSION) - } else if (metaEnabledFeatures.forall(_.isLegacyFeature)) { // also true for empty set - (0, 0) - } else if (metaEnabledFeatures.exists(f => !f.isLegacyFeature && f.isReaderWriterFeature)) { - (TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) - } else { - (0, TABLE_FEATURES_MIN_WRITER_VERSION) - } - } - allEnabledFeatures.foreach { feature => - readerVersionFromFeatures = math.max(readerVersionFromFeatures, feature.minReaderVersion) - writerVersionFromFeatures = math.max(writerVersionFromFeatures, feature.minWriterVersion) - } - // Protocol version provided in table properties can upgrade the protocol, but only when they // are higher than which required by the enabled features. val (readerVersionFromTableConfOpt, writerVersionFromTableConfOpt) = getProtocolVersionsFromTableConf(tableConf) - // Decide the final protocol version: - // a. 1, aka the lowest version possible - // b. version required by manually enabled features and metadata features - // c. version defined as table properties - val finalReaderVersion = - Seq(1, readerVersionFromFeatures, readerVersionFromTableConfOpt.getOrElse(0)).max - val finalWriterVersion = - Seq(1, writerVersionFromFeatures, writerVersionFromTableConfOpt.getOrElse(0)).max - // If the user explicitly sets the table versions, we need to take into account the // relevant implicit features. val implicitFeaturesFromTableConf = @@ -399,7 +364,14 @@ object Protocol { case _ => Set.empty } - (finalReaderVersion, finalWriterVersion, allEnabledFeatures ++ implicitFeaturesFromTableConf) + // Construct the minimum required protocol for the enabled features. + val minProtocol = Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeatures(allEnabledFeatures ++ implicitFeaturesFromTableConf) + .normalized + + // Return the minimum protocol components. + (minProtocol.minReaderVersion, minProtocol.minWriterVersion, + minProtocol.implicitlyAndExplicitlySupportedFeatures) } /** @@ -488,32 +460,12 @@ object Protocol { spark: SparkSession, metadata: Metadata, current: Protocol): Option[Protocol] = { - val (readerVersion, writerVersion, minRequiredFeatures) = - minProtocolComponentsFromAutomaticallyEnabledFeatures(spark, metadata, current) - - // If the user sets the protocol versions we need to take it account. In general, - // enabling legacy features on legacy protocols results to pumping up the protocol - // versions. However, setting table feature protocol versions while enabling - // legacy features results to only enabling the requested features. For example: - // 1) Create table with (1, 2), then ALTER TABLE with DeltaConfigs.CHANGE_DATA_FEED.key = true - // results to (1, 4). - // 2) Alternatively, Create table with (1, 2), then - // ALTER TABLE set versions (1, 7) and DeltaConfigs.CHANGE_DATA_FEED.key = true results - // to (1, 7, AppendOnly, Invariants, CDF). - val readerVersionFromConf = - Protocol.getReaderVersionFromTableConf(metadata.configuration).getOrElse(readerVersion) - val writerVersionFromConf = - Protocol.getWriterVersionFromTableConf(metadata.configuration).getOrElse(writerVersion) - - val finalReaderVersion = - Seq(readerVersion, readerVersionFromConf, current.minReaderVersion).max - val finalWriterVersion = - Seq(writerVersion, writerVersionFromConf, current.minWriterVersion).max - - // Increment the reader and writer version to accurately add enabled legacy table features - // either to the implicitly enabled table features or the table feature lists. + val required = - Protocol(finalReaderVersion, finalWriterVersion).withFeatures(minRequiredFeatures) + Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) + .withFeatures(extractAutomaticallyEnabledFeatures(spark, metadata, current)) + .normalized + if (!required.canUpgradeTo(current)) { // When the current protocol does not satisfy metadata requirement, some additional features // must be supported by the protocol. We assert those features can actually perform the diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala index f8bde8d239..75ca38b828 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSQLSuite.scala @@ -308,9 +308,16 @@ class DeltaCDCSQLSuite extends DeltaCDCSuiteBase with DeltaColumnMappingTestUtil // We set CDC to be enabled by default, so this should automatically bump the writer protocol // to the required version. if (columnMappingEnabled) { - assert(log.snapshot.protocol == Protocol(2, 5)) + assert(log.update().protocol == Protocol(2, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ChangeDataFeedTableFeature, + ColumnMappingTableFeature))) } else { - assert(log.snapshot.protocol == Protocol(1, 4)) + assert(log.update().protocol == Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ChangeDataFeedTableFeature))) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala index 4359e5c562..6d89818be9 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala @@ -482,12 +482,13 @@ class DeltaColumnMappingSuite extends QueryTest expectedSchema: StructType, ignorePhysicalName: Boolean, mode: String, - createNewTable: Boolean = true)(fn: => Unit): Unit = { + createNewTable: Boolean = true, + tableFeaturesProtocolExpected: Boolean = true)(fn: => Unit): Unit = { withTable(tableName) { fn checkProperties(tableName, readerVersion = 2, - writerVersion = 5, + writerVersion = if (tableFeaturesProtocolExpected) 7 else 5, mode = Some(mode), curMaxId = DeltaColumnMapping.findMaxColumnId(expectedSchema) ) @@ -826,7 +827,7 @@ class DeltaColumnMappingSuite extends QueryTest checkSchema("t1", schemaWithId) checkProperties("t1", readerVersion = 2, - writerVersion = 5, + writerVersion = 7, mode = Some(mode), curMaxId = DeltaColumnMapping.findMaxColumnId(schemaWithId) ) @@ -849,7 +850,7 @@ class DeltaColumnMappingSuite extends QueryTest checkProperties("t1", readerVersion = 2, - writerVersion = 5, + writerVersion = 7, mode = Some(mode), curMaxId = DeltaColumnMapping.findMaxColumnId(schemaWithIdNested)) checkSchema( @@ -871,7 +872,7 @@ class DeltaColumnMappingSuite extends QueryTest checkProperties("t1", readerVersion = 2, - writerVersion = 5, + writerVersion = 7, mode = Some(mode), curMaxId = curMaxId) @@ -886,7 +887,7 @@ class DeltaColumnMappingSuite extends QueryTest ) checkProperties("t1", readerVersion = 2, - writerVersion = 5, + writerVersion = 7, mode = Some(mode), curMaxId = curMaxId2) checkSchema("t1", @@ -938,7 +939,7 @@ class DeltaColumnMappingSuite extends QueryTest checkProperties("t1", readerVersion = 2, - writerVersion = 5, + writerVersion = 7, mode = Some(mode), curMaxId = curMaxId) checkSchema("t1", @@ -960,7 +961,7 @@ class DeltaColumnMappingSuite extends QueryTest ) checkProperties("t1", readerVersion = 2, - writerVersion = 5, + writerVersion = 7, mode = Some(mode), curMaxId = curMaxId2) checkSchema("t1", @@ -998,7 +999,7 @@ class DeltaColumnMappingSuite extends QueryTest checkProperties("t1", readerVersion = 2, - writerVersion = 5, + writerVersion = 7, mode = Some(mode), curMaxId = curMaxId) checkSchema("t1", schemaWithId) @@ -1013,7 +1014,7 @@ class DeltaColumnMappingSuite extends QueryTest checkProperties("t1", readerVersion = 2, - writerVersion = 5, + writerVersion = 7, mode = Some(mode), curMaxId = curMaxId) @@ -1037,7 +1038,7 @@ class DeltaColumnMappingSuite extends QueryTest val curMaxId2 = DeltaColumnMapping.findMaxColumnId(schemaWithId) + 1 checkProperties("t1", readerVersion = 2, - writerVersion = 5, + writerVersion = 7, mode = Some(mode), curMaxId = curMaxId2) checkSchema("t1", schemaWithId.add("c", StringType, true, withId(3))) @@ -1627,7 +1628,8 @@ class DeltaColumnMappingSuite extends QueryTest schemaWithDottedColumnNames, false, "name", - createNewTable = false + createNewTable = false, + tableFeaturesProtocolExpected = false ) { sql(s"CREATE TABLE t1 (${schemaWithDottedColumnNames.toDDL}) USING DELTA") alterTableWithProps("t1", props = Map( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLUsingPathSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLUsingPathSuite.scala index 6b28baaf39..8dba331eb0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLUsingPathSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLUsingPathSuite.scala @@ -169,25 +169,25 @@ trait DeltaDDLUsingPathTests extends QueryTest "key" -> "value") } + val protocol = Protocol.forNewTable(spark, Some(metadata)) + val supportedFeatures = protocol + .readerAndWriterFeatureNames + .map(name => s"delta.feature.$name" -> "supported") + val expectedProperties = Seq( + "delta.logRetentionDuration" -> "2 weeks", + "delta.minReaderVersion" -> protocol.minReaderVersion.toString, + "delta.minWriterVersion" -> protocol.minWriterVersion.toString, + "key" -> "value") ++ supportedFeatures + checkDatasetUnorderly( dropColumnMappingConfigurations( sql(s"SHOW TBLPROPERTIES $table").as[(String, String)]), - "delta.logRetentionDuration" -> "2 weeks", - "delta.minReaderVersion" -> - Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString, - "delta.minWriterVersion" -> - Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString, - "key" -> "value") + expectedProperties: _*) checkDatasetUnorderly( dropColumnMappingConfigurations( sql(s"SHOW TBLPROPERTIES delta.`$path`").as[(String, String)]), - "delta.logRetentionDuration" -> "2 weeks", - "delta.minReaderVersion" -> - Protocol.forNewTable(spark, Some(metadata)).minReaderVersion.toString, - "delta.minWriterVersion" -> - Protocol.forNewTable(spark, Some(metadata)).minWriterVersion.toString, - "key" -> "value") + expectedProperties: _*) if (table == "`delta_test`") { val tableName = s"$catalogName.default.delta_test" diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala index b492ca3e24..ec36c423e0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolTransitionsSuite.scala @@ -156,6 +156,38 @@ class DeltaProtocolTransitionsSuite extends DeltaProtocolTransitionsBaseSuite { expectedProtocol = Protocol(3, 7).withFeature(TestRemovableReaderWriterFeature)) } + test("Setting partial versions") { + testProtocolTransition( + createTableProperties = Seq( + ("delta.minWriterVersion", 3.toString)), + expectedProtocol = Protocol(1, 3)) + + testProtocolTransition( + alterTableProperties = Seq( + ("delta.minWriterVersion", 3.toString)), + expectedProtocol = Protocol(1, 3)) + + testProtocolTransition( + createTableProperties = Seq( + ("delta.minWriterVersion", 3.toString), + (s"delta.feature.${DeletionVectorsTableFeature.name}", "supported")), + expectedProtocol = Protocol(3, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + CheckConstraintsTableFeature, + DeletionVectorsTableFeature))) + + testProtocolTransition( + alterTableProperties = Seq( + ("delta.minWriterVersion", 3.toString), + (s"delta.feature.${DeletionVectorsTableFeature.name}", "supported")), + expectedProtocol = Protocol(3, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + CheckConstraintsTableFeature, + DeletionVectorsTableFeature))) + } + for ((readerVersion, writerVersion) <- Seq((2, 1), (2, 2), (2, 3), (2, 4), (1, 5))) test("Invalid legacy protocol normalization" + s" - invalidProtocol($readerVersion, $writerVersion)") { @@ -448,17 +480,42 @@ class DeltaProtocolTransitionsSuite extends DeltaProtocolTransitionsBaseSuite { test("Default Enabled legacy features") { testProtocolTransition( createTableProperties = Seq((DeltaConfigs.CHANGE_DATA_FEED.key, true.toString)), - expectedProtocol = Protocol(1, 4)) + expectedProtocol = Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ChangeDataFeedTableFeature))) testProtocolTransition( createTableProperties = Seq( ("delta.minReaderVersion", 1.toString), ("delta.minWriterVersion", 3.toString), (DeltaConfigs.CHANGE_DATA_FEED.key, true.toString)), + expectedProtocol = Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + CheckConstraintsTableFeature, + ChangeDataFeedTableFeature))) + + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString), + (DeltaConfigs.CHANGE_DATA_FEED.key, true.toString)), + expectedProtocol = Protocol(1, 4)) + + testProtocolTransition( + alterTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString), + (DeltaConfigs.CHANGE_DATA_FEED.key, true.toString)), expectedProtocol = Protocol(1, 4)) withSQLConf(DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true") { - testProtocolTransition(expectedProtocol = Protocol(1, 4)) + testProtocolTransition( + expectedProtocol = Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ChangeDataFeedTableFeature))) } testProtocolTransition( @@ -497,7 +554,10 @@ class DeltaProtocolTransitionsSuite extends DeltaProtocolTransitionsBaseSuite { testProtocolTransition( createTableColumns = Seq(("id", "INT")), createTableGeneratedColumns = Seq(("id2", "INT", "id + 1")), - expectedProtocol = Protocol(1, 4)) + expectedProtocol = Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + GeneratedColumnsTableFeature))) testProtocolTransition( createTableColumns = Seq(("id", "INT")), @@ -516,10 +576,6 @@ class DeltaProtocolTransitionsSuite extends DeltaProtocolTransitionsBaseSuite { expectedProtocol = Protocol(1, 7).withFeature(GeneratedColumnsTableFeature)) } - testProtocolTransition( - alterTableProperties = Seq((DeltaConfigs.CHANGE_DATA_FEED.key, "true")), - expectedProtocol = Protocol(1, 4)) - testProtocolTransition( alterTableProperties = Seq( ("delta.minReaderVersion", 1.toString), @@ -534,7 +590,27 @@ class DeltaProtocolTransitionsSuite extends DeltaProtocolTransitionsBaseSuite { test("Column Mapping does not require a manual protocol versions upgrade") { testProtocolTransition( createTableProperties = Seq((DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")), - expectedProtocol = Protocol(2, 5)) + expectedProtocol = Protocol(2, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ColumnMappingTableFeature))) + + withSQLConf(DeltaSQLConf.TABLE_FEATURES_TEST_FEATURES_ENABLED.key -> false.toString) { + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString), + (DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")), + expectedProtocol = Protocol(2, 5)) + + testProtocolTransition( + createTableProperties = Seq( + ("delta.minReaderVersion", 1.toString), + ("delta.minWriterVersion", 4.toString)), + alterTableProperties = Seq( + (DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")), + expectedProtocol = Protocol(2, 5)) + } testProtocolTransition( createTableProperties = Seq( @@ -553,7 +629,10 @@ class DeltaProtocolTransitionsSuite extends DeltaProtocolTransitionsBaseSuite { testProtocolTransition( alterTableProperties = Seq((DeltaConfigs.COLUMN_MAPPING_MODE.key, "name")), - expectedProtocol = Protocol(2, 5)) + expectedProtocol = Protocol(2, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ColumnMappingTableFeature))) testProtocolTransition( alterTableProperties = Seq( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala index 0a4b3c542e..b0e34c49ad 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaProtocolVersionSuite.scala @@ -103,7 +103,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest readerVersion = 1, writerVersion = 1, sqlConfs = Seq((DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey, "true")), - expectedProtocol = Protocol(1, 1).merge(ChangeDataFeedTableFeature.minProtocolVersion)) + expectedProtocol = Protocol(1, 7).withFeature(ChangeDataFeedTableFeature)) testEmptyFolder( readerVersion = 1, writerVersion = 1, @@ -1053,7 +1053,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest "creating a new table with default protocol - requiring more recent protocol version") { val tableName = "delta_test" def testTableCreation(fn: String => Unit, tableInitialized: Boolean = false): Unit = - testCreation(tableName, 2, tableInitialized)(fn) + testCreation(tableName, 7, tableInitialized)(fn) testTableCreation { dir => spark.range(10).writeTo(tableName).using("delta") @@ -1115,7 +1115,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest sql(s"CREATE TABLE $tbl (id bigint) USING delta LOCATION '${dir.getCanonicalPath}'") } val deltaLog = DeltaLog.forTable(spark, dir) - assert(deltaLog.snapshot.protocol.minWriterVersion === 1, + assert(deltaLog.update().protocol.minWriterVersion === 1, "Should've picked up the protocol from the configuration") // Replace the table and make sure the config is picked up @@ -1123,13 +1123,13 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest spark.range(10).writeTo(tbl).using("delta") .tableProperty("location", dir.getCanonicalPath).replace() } - assert(deltaLog.snapshot.protocol.minWriterVersion === 2, + assert(deltaLog.update().protocol.minWriterVersion === 2, "Should've picked up the protocol from the configuration") // Will not downgrade without special flag. withSQLConf(DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> "1") { sql(s"REPLACE TABLE $tbl (id bigint) USING delta LOCATION '${dir.getCanonicalPath}'") - assert(deltaLog.snapshot.protocol.minWriterVersion === 2, + assert(deltaLog.update().protocol.minWriterVersion === 2, "Should not pick up the protocol from the configuration") } @@ -1138,23 +1138,23 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest DeltaSQLConf.DELTA_PROTOCOL_DEFAULT_WRITER_VERSION.key -> "1", DeltaSQLConf.REPLACE_TABLE_PROTOCOL_DOWNGRADE_ALLOWED.key -> "true") { sql(s"REPLACE TABLE $tbl (id bigint) USING delta LOCATION '${dir.getCanonicalPath}'") - assert(deltaLog.snapshot.protocol.minWriterVersion === 1, + assert(deltaLog.update().protocol.minWriterVersion === 1, "Should've created a new protocol") sql(s"CREATE OR REPLACE TABLE $tbl (id bigint NOT NULL) USING delta " + s"LOCATION '${dir.getCanonicalPath}'") - assert(deltaLog.snapshot.protocol.minWriterVersion === 2, + assert(deltaLog.update().protocol === Protocol(1, 7).withFeature(InvariantsTableFeature), "Invariant should require the higher protocol") // Go back to version 1 sql(s"REPLACE TABLE $tbl (id bigint) USING delta LOCATION '${dir.getCanonicalPath}'") - assert(deltaLog.snapshot.protocol.minWriterVersion === 1, + assert(deltaLog.update().protocol.minWriterVersion === 1, "Should've created a new protocol") // Check table properties with different syntax spark.range(10).writeTo(tbl).tableProperty("location", dir.getCanonicalPath) .tableProperty("delta.appendOnly", "true").using("delta").createOrReplace() - assert(deltaLog.snapshot.protocol.minWriterVersion === 2, + assert(deltaLog.update().protocol === Protocol(1, 7).withFeature(AppendOnlyTableFeature), "appendOnly should require the higher protocol") } } @@ -1368,8 +1368,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest " delta.minWriterVersion='2'," + " delta.enableChangeDataFeed='true'" + ")") - assert(deltaLog.snapshot.protocol.minReaderVersion === 1) - assert(deltaLog.snapshot.protocol.minWriterVersion === 4) + assert(deltaLog.update().protocol === Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ChangeDataFeedTableFeature))) } } @@ -1701,12 +1703,15 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest DeltaConfigs.MIN_READER_VERSION.key -> "1", DeltaConfigs.MIN_WRITER_VERSION.key -> "1", DeltaConfigs.CHANGE_DATA_FEED.key -> "true"), - expectedFinalProtocol = Some(Protocol(1, 4))) + expectedFinalProtocol = Some(Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ChangeDataFeedTableFeature)))) testAlterTable( "legacy protocol, legacy feature, metadata", Map("delta.appendOnly" -> "true"), - expectedFinalProtocol = Some(Protocol(1, 2))) + expectedFinalProtocol = Some(Protocol(1, 7).withFeature(AppendOnlyTableFeature))) testAlterTable( "legacy protocol, legacy feature, feature property", @@ -1925,7 +1930,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest .tableProperty("delta.appendOnly", "true") .using("delta") .create() - val protocolOfNewTable = Protocol(1, 2) + val protocolOfNewTable = Protocol(1, 7).withFeature(AppendOnlyTableFeature) assert(deltaLog.update().protocol === protocolOfNewTable) val e = intercept[DeltaTableFeatureException] { @@ -1976,8 +1981,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` (id bigint) USING delta " + "TBLPROPERTIES (delta.minWriterVersion=1, delta.appendOnly=true)") - assert(deltaLog.snapshot.protocol.minWriterVersion === 2) - assertPropertiesAndShowTblProperties(deltaLog) + assert(deltaLog.update().protocol === Protocol(1, 7).withFeature(AppendOnlyTableFeature)) + assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } } } @@ -1992,8 +1997,8 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest .using("delta") .create() - assert(deltaLog.snapshot.protocol.minWriterVersion === 2) - assertPropertiesAndShowTblProperties(deltaLog) + assert(deltaLog.update().protocol === Protocol(1, 7).withFeature(AppendOnlyTableFeature)) + assertPropertiesAndShowTblProperties(deltaLog, tableHasFeatures = true) } } } @@ -2075,7 +2080,7 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest |ALTER TABLE delta.`${log.dataPath.toString}` |SET TBLPROPERTIES ('delta.appendOnly' = 'true') """.stripMargin) - assert(log.snapshot.protocol.minWriterVersion === 2) + assert(log.update().protocol === Protocol(1, 7).withFeature(AppendOnlyTableFeature)) } } @@ -2101,7 +2106,10 @@ trait DeltaProtocolVersionSuiteBase extends QueryTest | 'delta.minWriterVersion' = '2', | 'delta.enableChangeDataFeed' = 'true' |)""".stripMargin) - assert(log.snapshot.protocol.minWriterVersion === 4) + assert(log.update().protocol === Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ChangeDataFeedTableFeature))) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala index 9d7c826f81..5dcb76e0d1 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTableFeatureSuite.scala @@ -406,9 +406,10 @@ class DeltaTableFeatureSuite withTable("tbl") { spark.range(0).write.format("delta").saveAsTable("tbl") val log = DeltaLog.forTable(spark, TableIdentifier("tbl")) - val protocol = log.update().protocol - assert(protocol.minReaderVersion === 2) - assert(protocol.minWriterVersion === 5) + assert(log.update().protocol === Protocol(2, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + ColumnMappingTableFeature))) val tblProperties = Seq(s"'$FEATURE_PROP_PREFIX${TestWriterFeature.name}' = 'enabled'", s"'delta.minWriterVersion' = $TABLE_FEATURES_MIN_WRITER_VERSION") sql(buildTablePropertyModifyingCommand( @@ -416,16 +417,9 @@ class DeltaTableFeatureSuite val newProtocol = log.update().protocol assert(newProtocol.readerAndWriterFeatureNames === Set( AppendOnlyTableFeature.name, - ColumnMappingTableFeature.name, InvariantsTableFeature.name, - CheckConstraintsTableFeature.name, - ChangeDataFeedTableFeature.name, - GeneratedColumnsTableFeature.name, - TestWriterFeature.name, - TestLegacyWriterFeature.name, - TestLegacyReaderWriterFeature.name, - TestRemovableLegacyWriterFeature.name, - TestRemovableLegacyReaderWriterFeature.name)) + ColumnMappingTableFeature.name, + TestWriterFeature.name)) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala index 43adac75df..28243d6806 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/GeneratedColumnSuite.scala @@ -21,6 +21,7 @@ import java.io.PrintWriter import scala.collection.JavaConverters._ +import org.apache.spark.sql.delta.actions.Protocol import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, InvariantViolationException, SchemaUtils} import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY @@ -998,28 +999,25 @@ trait GeneratedColumnSuiteBase test("using generated columns should upgrade the protocol") { withTableName("upgrade_protocol") { table => - def protocolVersions: (Int, Int) = { - sql(s"DESC DETAIL $table") - .select("minReaderVersion", "minWriterVersion") - .as[(Int, Int)] - .head() - } - - // Use the default protocol versions when not using computed partitions + // Use the default protocol versions when not using computed partitions. createTable(table, None, "i INT", Map.empty, Seq.empty) - assert(protocolVersions == (1, 2)) + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName = table)) + assert(deltaLog.update().protocol == Protocol(1, 2)) assert(DeltaLog.forTable(spark, TableIdentifier(tableName = table)).snapshot.version == 0) - // Protocol versions should be upgraded when using computed partitions + // Protocol versions should be upgraded when using computed partitions. replaceTable( table, None, defaultTestTableSchema, defaultTestTableGeneratedColumns, defaultTestTablePartitionColumns) - assert(protocolVersions == (1, 4)) + assert(deltaLog.update().protocol == Protocol(1, 7).withFeatures(Seq( + AppendOnlyTableFeature, + InvariantsTableFeature, + GeneratedColumnsTableFeature))) // Make sure we did overwrite the table rather than deleting and re-creating. - assert(DeltaLog.forTable(spark, TableIdentifier(tableName = table)).snapshot.version == 1) + assert(DeltaLog.forTable(spark, TableIdentifier(tableName = table)).update().version == 1) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala index 00a146dd09..7aac422bd8 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/IdentityColumnSuite.scala @@ -333,7 +333,7 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { TestColumnSpec(colName = "id", dataType = LongType), TestColumnSpec(colName = "value", dataType = IntegerType)) ) - assert(getProtocolVersions == (1, 2) || getProtocolVersions == (2, 5)) + assert(getProtocolVersions == (1, 2) || getProtocolVersions == (2, 7)) assert(DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.version == 0) replaceTable( @@ -347,8 +347,11 @@ trait IdentityColumnSuiteBase extends IdentityColumnTestUtils { TestColumnSpec(colName = "value", dataType = IntegerType) ) ) - assert(getProtocolVersions == (1, 6) || getProtocolVersions == (2, 6)) - assert(DeltaLog.forTable(spark, TableIdentifier(tblName)).snapshot.version == 1) + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tblName)) + val protocol = deltaLog.update().protocol + assert(getProtocolVersions == (1, 7) || + protocol.readerAndWriterFeatures.contains(IdentityColumnsTableFeature)) + assert(deltaLog.update().version == 1) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala index f52307ef39..3f438d2a04 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/RestoreTableSuiteBase.scala @@ -251,11 +251,11 @@ trait RestoreTableSuiteBase extends QueryTest with SharedSparkSession deltaLog.upgradeProtocol( Protocol(TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION) .withFeatures(Seq(TestLegacyReaderWriterFeature)) - .withFeatures(oldProtocolVersion.implicitlySupportedFeatures)) + .withFeatures(oldProtocolVersion.implicitlyAndExplicitlySupportedFeatures)) val newProtocolVersion = deltaLog.snapshot.protocol assert( newProtocolVersion.minReaderVersion > oldProtocolVersion.minReaderVersion && - newProtocolVersion.minWriterVersion > oldProtocolVersion.minWriterVersion, + newProtocolVersion.minWriterVersion >= oldProtocolVersion.minWriterVersion, s"newProtocolVersion=$newProtocolVersion is not strictly greater than" + s" oldProtocolVersion=$oldProtocolVersion") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala index 157f444b47..7fe7e1288f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala @@ -408,8 +408,9 @@ class InvariantEnforcementSuite extends QueryTest configuration = txn.metadata.configuration + ("delta.constraints.mychk" -> "valueA < valueB")) txn.commit(Seq(newMetadata), DeltaOperations.ManualUpdate) - assert(table.deltaLog.update().protocol.minWriterVersion === - CheckConstraintsTableFeature.minWriterVersion) + val protocol = table.deltaLog.update().protocol + assert(protocol.implicitlyAndExplicitlySupportedFeatures + .contains(CheckConstraintsTableFeature)) spark.sql("INSERT INTO constraint VALUES (50, 100, null)") val e = intercept[InvariantViolationException] { spark.sql("INSERT INTO constraint VALUES (100, 50, null)")