From 961b66a95bc25e16427d7f7b8ae8561d87f2e0c9 Mon Sep 17 00:00:00 2001 From: Steven Ayers Date: Sat, 14 Sep 2024 09:45:35 +0100 Subject: [PATCH] Move Profile options into shareCredentialsOptions map Signed-off-by: Steven Ayers --- .../client/DeltaSharingProfileProvider.scala | 5 +- .../sharing/spark/DeltaSharingOptions.scala | 52 +++++++++-------- ...taSharingOptionsProfileProviderSuite.scala | 58 +++++++++---------- .../spark/DeltaSharingOptionsSuite.scala | 32 ++++++---- 4 files changed, 81 insertions(+), 66 deletions(-) diff --git a/client/src/main/scala/io/delta/sharing/client/DeltaSharingProfileProvider.scala b/client/src/main/scala/io/delta/sharing/client/DeltaSharingProfileProvider.scala index dd46fb687..af05a0d71 100644 --- a/client/src/main/scala/io/delta/sharing/client/DeltaSharingProfileProvider.scala +++ b/client/src/main/scala/io/delta/sharing/client/DeltaSharingProfileProvider.scala @@ -31,7 +31,6 @@ import org.apache.spark.delta.sharing.TableRefreshResult import io.delta.sharing.client.DeltaSharingProfile.{validateNotNullAndEmpty, BEARER_TOKEN, OAUTH_CLIENT_CREDENTIALS} import io.delta.sharing.client.util.JsonUtils -import io.delta.sharing.spark.DeltaSharingOptions @JsonDeserialize(using = classOf[DeltaSharingProfileDeserializer]) sealed trait DeltaSharingProfile { @@ -205,11 +204,11 @@ private[sharing] class DeltaSharingFileProfileProvider( * Load [[DeltaSharingProfile]] from options. */ private[sharing] class DeltaSharingOptionsProfileProvider( - options: DeltaSharingOptions) extends DeltaSharingProfileProvider { + shareCredentialsOptions: Map[String, String]) extends DeltaSharingProfileProvider { val profile = { val profile = { - JsonUtils.fromJson[DeltaSharingProfile](JsonUtils.toJson(options)) + JsonUtils.fromJson[DeltaSharingProfile](JsonUtils.toJson(shareCredentialsOptions)) } profile.validate() diff --git a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala index 3585e2f68..95ab75a6a 100644 --- a/client/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala +++ b/client/src/main/scala/io/delta/sharing/spark/DeltaSharingOptions.scala @@ -105,28 +105,7 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser { str }.getOrElse(RESPONSE_FORMAT_PARQUET) - val shareCredentialsVersion = options.get(PROFILE_SHARE_CREDENTIALS_VERSION).map { str => - Try(str.toInt).toOption.getOrElse { - throw DeltaSharingErrors.illegalDeltaSharingOptionException( - PROFILE_SHARE_CREDENTIALS_VERSION, str, "must be an integer") - } - } - - val `type` = options.get(PROFILE_SHARE_CREDENTIALS_TYPE) - - val endpoint = options.get(PROFILE_ENDPOINT) - - val tokenEndpoint = options.get(PROFILE_TOKEN_ENDPOINT) - - val clientId = options.get(PROFILE_CLIENT_ID) - - val clientSecret = options.get(PROFILE_CLIENT_SECRET) - - val scope = options.get(PROFILE_SCOPE) - - val bearerToken = options.get(PROFILE_BEARER_TOKEN) - - val expirationTime = options.get(PROFILE_EXPIRATION_TIME).map(getFormattedTimestamp(_)) + val shareCredentialsOptions: Map[String, String] = prepareShareCredentialsOptions() def isTimeTravel: Boolean = versionAsOf.isDefined || timestampAsOf.isDefined @@ -157,6 +136,21 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser { } } + private def prepareShareCredentialsOptions(): Map[String, String] = { + validShareCredentialsOptions.filter { option => + options.contains(option._1) + }.map { option => + val key = option._1 + val value = key match { + case PROFILE_EXPIRATION_TIME => + getFormattedTimestamp(options.get(key).get) + case _ => + options.get(key).get + } + key -> value + } + } + private def validateOneStartingOption(): Unit = { if (startingTimestamp.isDefined && startingVersion.isDefined) { throw DeltaSharingErrors.versionAndTimestampBothSetException( @@ -220,7 +214,7 @@ object DeltaSharingOptions extends Logging { val RESPONSE_FORMAT_DELTA = "delta" val PROFILE_SHARE_CREDENTIALS_VERSION = "shareCredentialsVersion" - val PROFILE_SHARE_CREDENTIALS_TYPE = "type" + val PROFILE_TYPE = "type" val PROFILE_ENDPOINT = "endpoint" val PROFILE_TOKEN_ENDPOINT = "tokenEndpoint" val PROFILE_CLIENT_ID = "clientId" @@ -237,6 +231,18 @@ object DeltaSharingOptions extends Logging { CDF_START_VERSION -> "", CDF_END_VERSION -> "" ) + + val validShareCredentialsOptions = Map( + PROFILE_SHARE_CREDENTIALS_VERSION -> "", + PROFILE_TYPE -> "", + PROFILE_ENDPOINT -> "", + PROFILE_TOKEN_ENDPOINT -> "", + PROFILE_CLIENT_ID -> "", + PROFILE_CLIENT_SECRET -> "", + PROFILE_SCOPE -> "", + PROFILE_BEARER_TOKEN -> "", + PROFILE_EXPIRATION_TIME -> "" + ) } /** diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingOptionsProfileProviderSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingOptionsProfileProviderSuite.scala index 60e53d614..88ac15e12 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingOptionsProfileProviderSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingOptionsProfileProviderSuite.scala @@ -18,26 +18,24 @@ package io.delta.sharing.client import org.apache.spark.SparkFunSuite -import io.delta.sharing.spark.DeltaSharingOptions - class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { - private def testProfile(options: DeltaSharingOptions, expected: DeltaSharingProfile): Unit = { - assert(new DeltaSharingOptionsProfileProvider(options) + private def testProfile( + shareCredentialsOptions: Map[String, String], expected: DeltaSharingProfile): Unit = { + assert(new DeltaSharingOptionsProfileProvider(shareCredentialsOptions) .getProfile == expected) - } test("parse") { testProfile( - new DeltaSharingOptions(Map( + Map( "shareCredentialsVersion" -> "1", "endpoint" -> "foo", "bearerToken" -> "bar", - "expirationTime" -> "2021-11-12T00:12:29.0Z" - )), + "expirationTime" -> "2021-11-12T00:12:29Z" + ), DeltaSharingProfile( shareCredentialsVersion = Some(1), endpoint = "foo", @@ -49,11 +47,11 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { test("expirationTime is optional") { testProfile( - new DeltaSharingOptions(Map( + Map( "shareCredentialsVersion" -> "1", "endpoint" -> "foo", "bearerToken" -> "bar" - )), + ), DeltaSharingProfile( shareCredentialsVersion = Some(1), endpoint = "foo", @@ -65,10 +63,10 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { test("shareCredentialsVersion is missing") { val e = intercept[IllegalArgumentException] { testProfile( - new DeltaSharingOptions(Map( + Map( "endpoint" -> "foo", "bearerToken" -> "bar" - )), + ), null ) } @@ -79,11 +77,11 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { test("shareCredentialsVersion is incorrect") { val e = intercept[IllegalArgumentException] { testProfile( - new DeltaSharingOptions(Map( + Map( "shareCredentialsVersion" -> "2", "endpoint" -> "foo", "bearerToken" -> "bar" - )), + ), null ) } @@ -94,9 +92,9 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { test("shareCredentialsVersion is not supported") { val e = intercept[IllegalArgumentException] { testProfile( - new DeltaSharingOptions(Map( + Map( "shareCredentialsVersion" -> "100" - )), + ), null ) } @@ -107,10 +105,10 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { test("endpoint is missing") { val e = intercept[IllegalArgumentException] { testProfile( - new DeltaSharingOptions(Map( + Map( "shareCredentialsVersion" -> "1", "bearerToken" -> "bar" - )), + ), null ) } @@ -120,10 +118,10 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { test("bearerToken is missing") { val e = intercept[IllegalArgumentException] { testProfile( - new DeltaSharingOptions(Map( + Map( "shareCredentialsVersion" -> "1", "endpoint" -> "foo" - )), + ), null ) } @@ -132,13 +130,13 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { test("unknown field should be ignored") { testProfile( - new DeltaSharingOptions(Map( + Map( "shareCredentialsVersion" -> "1", "endpoint" -> "foo", "bearerToken" -> "bar", "expirationTime" -> "2021-11-12T00:12:29Z", "futureField" -> "xyz" - )), + ), DeltaSharingProfile( shareCredentialsVersion = Some(1), endpoint = "foo", @@ -150,14 +148,14 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { test("oauth_client_credentials profile without optional scope") { testProfile( - new DeltaSharingOptions(Map( + Map( "shareCredentialsVersion" -> "2", "endpoint" -> "foo", "tokenEndpoint" -> "bar", "clientId" -> "abc", "clientSecret" -> "xyz", "type" -> "oauth_client_credentials" - )), + ), OAuthClientCredentialsDeltaSharingProfile( shareCredentialsVersion = Some(2), endpoint = "foo", @@ -170,7 +168,7 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { test("oauth_client_credentials profile with optional scope") { testProfile( - new DeltaSharingOptions(Map( + Map( "shareCredentialsVersion" -> "2", "endpoint" -> "foo", "tokenEndpoint" -> "bar", @@ -178,7 +176,7 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { "clientSecret" -> "xyz", "type" -> "oauth_client_credentials", "scope" -> "testScope" - )), + ), OAuthClientCredentialsDeltaSharingProfile( shareCredentialsVersion = Some(2), endpoint = "foo", @@ -193,7 +191,7 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { test("oauth_client_credentials only supports version 2") { val e = intercept[IllegalArgumentException] { testProfile( - new DeltaSharingOptions(Map( + Map( "shareCredentialsVersion" -> "1", "endpoint" -> "foo", "tokenEndpoint" -> "bar", @@ -201,7 +199,7 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { "clientSecret" -> "xyz", "type" -> "oauth_client_credentials", "scope" -> "testScope" - )), + ), null ) } @@ -213,12 +211,12 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite { for (missingField <- mandatoryFields) { - val profile = new DeltaSharingOptions( + val profile = { mandatoryFields .filter(_ != missingField) .map(f => f -> "value") .toMap + ("shareCredentialsVersion" -> "2", "type" -> "oauth_client_credentials") - ) + } val e = intercept[IllegalArgumentException] { testProfile(profile, null) diff --git a/client/src/test/scala/io/delta/sharing/spark/DeltaSharingOptionsSuite.scala b/client/src/test/scala/io/delta/sharing/spark/DeltaSharingOptionsSuite.scala index 5ee187785..2d58c1029 100644 --- a/client/src/test/scala/io/delta/sharing/spark/DeltaSharingOptionsSuite.scala +++ b/client/src/test/scala/io/delta/sharing/spark/DeltaSharingOptionsSuite.scala @@ -81,9 +81,11 @@ class DeltaSharingOptionsSuite extends SparkFunSuite { assert(options.options.get(DeltaSharingOptions.CDF_END_VERSION) == Some("2")) assert(options.options.get(DeltaSharingOptions.CDF_END_TIMESTAMP) == Some("2020-01-01")) assert(options.options.get("notreservedoption") == Some("random")) + } + test("Parse shareCredentials map successfully") { // profile as opts - options = new DeltaSharingOptions(Map( + var options = new DeltaSharingOptions(Map( "shareCredentialsVersion" -> "1", "type" -> "bearer_token", "endpoint" -> "foo", @@ -94,15 +96,25 @@ class DeltaSharingOptionsSuite extends SparkFunSuite { "bearerToken" -> "bar", "expirationTime" -> "2022-01-01T00:00:00-02:00" )) - assert(options.shareCredentialsVersion == Some(1)) - assert(options.`type` == Some("bearer_token")) - assert(options.endpoint == Some("foo")) - assert(options.tokenEndpoint == Some("bar")) - assert(options.clientId == Some("abc")) - assert(options.clientSecret == Some("xyz")) - assert(options.scope == Some("testScope")) - assert(options.bearerToken == Some("bar")) - assert(options.expirationTime == Some("2022-01-01T02:00:00Z")) + + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_SHARE_CREDENTIALS_VERSION) == Some("1")) + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_TYPE) == Some("bearer_token")) + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_ENDPOINT) == Some("foo")) + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_TOKEN_ENDPOINT) == Some("bar")) + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_CLIENT_ID) == Some("abc")) + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_CLIENT_SECRET) == Some("xyz")) + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_SCOPE) == Some("testScope")) + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_BEARER_TOKEN) == Some("bar")) + assert(options.shareCredentialsOptions.get( + DeltaSharingOptions.PROFILE_EXPIRATION_TIME) == Some("2022-01-01T02:00:00Z")) }