Skip to content

Commit

Permalink
Move Profile options into shareCredentialsOptions map
Browse files Browse the repository at this point in the history
Signed-off-by: Steven Ayers <steven@ayers.io>
  • Loading branch information
stevenayers committed Sep 14, 2024
1 parent 9bc8322 commit 961b66a
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.delta.sharing.TableRefreshResult
import io.delta.sharing.client.DeltaSharingProfile.{validateNotNullAndEmpty, BEARER_TOKEN,
OAUTH_CLIENT_CREDENTIALS}
import io.delta.sharing.client.util.JsonUtils
import io.delta.sharing.spark.DeltaSharingOptions

@JsonDeserialize(using = classOf[DeltaSharingProfileDeserializer])
sealed trait DeltaSharingProfile {
Expand Down Expand Up @@ -205,11 +204,11 @@ private[sharing] class DeltaSharingFileProfileProvider(
* Load [[DeltaSharingProfile]] from options.
*/
private[sharing] class DeltaSharingOptionsProfileProvider(
options: DeltaSharingOptions) extends DeltaSharingProfileProvider {
shareCredentialsOptions: Map[String, String]) extends DeltaSharingProfileProvider {

val profile = {
val profile = {
JsonUtils.fromJson[DeltaSharingProfile](JsonUtils.toJson(options))
JsonUtils.fromJson[DeltaSharingProfile](JsonUtils.toJson(shareCredentialsOptions))
}
profile.validate()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,28 +105,7 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser {
str
}.getOrElse(RESPONSE_FORMAT_PARQUET)

val shareCredentialsVersion = options.get(PROFILE_SHARE_CREDENTIALS_VERSION).map { str =>
Try(str.toInt).toOption.getOrElse {
throw DeltaSharingErrors.illegalDeltaSharingOptionException(
PROFILE_SHARE_CREDENTIALS_VERSION, str, "must be an integer")
}
}

val `type` = options.get(PROFILE_SHARE_CREDENTIALS_TYPE)

val endpoint = options.get(PROFILE_ENDPOINT)

val tokenEndpoint = options.get(PROFILE_TOKEN_ENDPOINT)

val clientId = options.get(PROFILE_CLIENT_ID)

val clientSecret = options.get(PROFILE_CLIENT_SECRET)

val scope = options.get(PROFILE_SCOPE)

val bearerToken = options.get(PROFILE_BEARER_TOKEN)

val expirationTime = options.get(PROFILE_EXPIRATION_TIME).map(getFormattedTimestamp(_))
val shareCredentialsOptions: Map[String, String] = prepareShareCredentialsOptions()

def isTimeTravel: Boolean = versionAsOf.isDefined || timestampAsOf.isDefined

Expand Down Expand Up @@ -157,6 +136,21 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser {
}
}

private def prepareShareCredentialsOptions(): Map[String, String] = {
validShareCredentialsOptions.filter { option =>
options.contains(option._1)
}.map { option =>
val key = option._1
val value = key match {
case PROFILE_EXPIRATION_TIME =>
getFormattedTimestamp(options.get(key).get)
case _ =>
options.get(key).get
}
key -> value
}
}

private def validateOneStartingOption(): Unit = {
if (startingTimestamp.isDefined && startingVersion.isDefined) {
throw DeltaSharingErrors.versionAndTimestampBothSetException(
Expand Down Expand Up @@ -220,7 +214,7 @@ object DeltaSharingOptions extends Logging {
val RESPONSE_FORMAT_DELTA = "delta"

val PROFILE_SHARE_CREDENTIALS_VERSION = "shareCredentialsVersion"
val PROFILE_SHARE_CREDENTIALS_TYPE = "type"
val PROFILE_TYPE = "type"
val PROFILE_ENDPOINT = "endpoint"
val PROFILE_TOKEN_ENDPOINT = "tokenEndpoint"
val PROFILE_CLIENT_ID = "clientId"
Expand All @@ -237,6 +231,18 @@ object DeltaSharingOptions extends Logging {
CDF_START_VERSION -> "",
CDF_END_VERSION -> ""
)

val validShareCredentialsOptions = Map(
PROFILE_SHARE_CREDENTIALS_VERSION -> "",
PROFILE_TYPE -> "",
PROFILE_ENDPOINT -> "",
PROFILE_TOKEN_ENDPOINT -> "",
PROFILE_CLIENT_ID -> "",
PROFILE_CLIENT_SECRET -> "",
PROFILE_SCOPE -> "",
PROFILE_BEARER_TOKEN -> "",
PROFILE_EXPIRATION_TIME -> ""
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,24 @@ package io.delta.sharing.client

import org.apache.spark.SparkFunSuite

import io.delta.sharing.spark.DeltaSharingOptions



class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {

private def testProfile(options: DeltaSharingOptions, expected: DeltaSharingProfile): Unit = {
assert(new DeltaSharingOptionsProfileProvider(options)
private def testProfile(
shareCredentialsOptions: Map[String, String], expected: DeltaSharingProfile): Unit = {
assert(new DeltaSharingOptionsProfileProvider(shareCredentialsOptions)
.getProfile == expected)

}

test("parse") {
testProfile(
new DeltaSharingOptions(Map(
Map(
"shareCredentialsVersion" -> "1",
"endpoint" -> "foo",
"bearerToken" -> "bar",
"expirationTime" -> "2021-11-12T00:12:29.0Z"
)),
"expirationTime" -> "2021-11-12T00:12:29Z"
),
DeltaSharingProfile(
shareCredentialsVersion = Some(1),
endpoint = "foo",
Expand All @@ -49,11 +47,11 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {

test("expirationTime is optional") {
testProfile(
new DeltaSharingOptions(Map(
Map(
"shareCredentialsVersion" -> "1",
"endpoint" -> "foo",
"bearerToken" -> "bar"
)),
),
DeltaSharingProfile(
shareCredentialsVersion = Some(1),
endpoint = "foo",
Expand All @@ -65,10 +63,10 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {
test("shareCredentialsVersion is missing") {
val e = intercept[IllegalArgumentException] {
testProfile(
new DeltaSharingOptions(Map(
Map(
"endpoint" -> "foo",
"bearerToken" -> "bar"
)),
),
null
)
}
Expand All @@ -79,11 +77,11 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {
test("shareCredentialsVersion is incorrect") {
val e = intercept[IllegalArgumentException] {
testProfile(
new DeltaSharingOptions(Map(
Map(
"shareCredentialsVersion" -> "2",
"endpoint" -> "foo",
"bearerToken" -> "bar"
)),
),
null
)
}
Expand All @@ -94,9 +92,9 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {
test("shareCredentialsVersion is not supported") {
val e = intercept[IllegalArgumentException] {
testProfile(
new DeltaSharingOptions(Map(
Map(
"shareCredentialsVersion" -> "100"
)),
),
null
)
}
Expand All @@ -107,10 +105,10 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {
test("endpoint is missing") {
val e = intercept[IllegalArgumentException] {
testProfile(
new DeltaSharingOptions(Map(
Map(
"shareCredentialsVersion" -> "1",
"bearerToken" -> "bar"
)),
),
null
)
}
Expand All @@ -120,10 +118,10 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {
test("bearerToken is missing") {
val e = intercept[IllegalArgumentException] {
testProfile(
new DeltaSharingOptions(Map(
Map(
"shareCredentialsVersion" -> "1",
"endpoint" -> "foo"
)),
),
null
)
}
Expand All @@ -132,13 +130,13 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {

test("unknown field should be ignored") {
testProfile(
new DeltaSharingOptions(Map(
Map(
"shareCredentialsVersion" -> "1",
"endpoint" -> "foo",
"bearerToken" -> "bar",
"expirationTime" -> "2021-11-12T00:12:29Z",
"futureField" -> "xyz"
)),
),
DeltaSharingProfile(
shareCredentialsVersion = Some(1),
endpoint = "foo",
Expand All @@ -150,14 +148,14 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {

test("oauth_client_credentials profile without optional scope") {
testProfile(
new DeltaSharingOptions(Map(
Map(
"shareCredentialsVersion" -> "2",
"endpoint" -> "foo",
"tokenEndpoint" -> "bar",
"clientId" -> "abc",
"clientSecret" -> "xyz",
"type" -> "oauth_client_credentials"
)),
),
OAuthClientCredentialsDeltaSharingProfile(
shareCredentialsVersion = Some(2),
endpoint = "foo",
Expand All @@ -170,15 +168,15 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {

test("oauth_client_credentials profile with optional scope") {
testProfile(
new DeltaSharingOptions(Map(
Map(
"shareCredentialsVersion" -> "2",
"endpoint" -> "foo",
"tokenEndpoint" -> "bar",
"clientId" -> "abc",
"clientSecret" -> "xyz",
"type" -> "oauth_client_credentials",
"scope" -> "testScope"
)),
),
OAuthClientCredentialsDeltaSharingProfile(
shareCredentialsVersion = Some(2),
endpoint = "foo",
Expand All @@ -193,15 +191,15 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {
test("oauth_client_credentials only supports version 2") {
val e = intercept[IllegalArgumentException] {
testProfile(
new DeltaSharingOptions(Map(
Map(
"shareCredentialsVersion" -> "1",
"endpoint" -> "foo",
"tokenEndpoint" -> "bar",
"clientId" -> "abc",
"clientSecret" -> "xyz",
"type" -> "oauth_client_credentials",
"scope" -> "testScope"
)),
),
null
)
}
Expand All @@ -213,12 +211,12 @@ class DeltaSharingOptionsProfileProviderSuite extends SparkFunSuite {

for (missingField <- mandatoryFields) {

val profile = new DeltaSharingOptions(
val profile = {
mandatoryFields
.filter(_ != missingField)
.map(f => f -> "value")
.toMap + ("shareCredentialsVersion" -> "2", "type" -> "oauth_client_credentials")
)
}

val e = intercept[IllegalArgumentException] {
testProfile(profile, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,11 @@ class DeltaSharingOptionsSuite extends SparkFunSuite {
assert(options.options.get(DeltaSharingOptions.CDF_END_VERSION) == Some("2"))
assert(options.options.get(DeltaSharingOptions.CDF_END_TIMESTAMP) == Some("2020-01-01"))
assert(options.options.get("notreservedoption") == Some("random"))
}

test("Parse shareCredentials map successfully") {
// profile as opts
options = new DeltaSharingOptions(Map(
var options = new DeltaSharingOptions(Map(
"shareCredentialsVersion" -> "1",
"type" -> "bearer_token",
"endpoint" -> "foo",
Expand All @@ -94,15 +96,25 @@ class DeltaSharingOptionsSuite extends SparkFunSuite {
"bearerToken" -> "bar",
"expirationTime" -> "2022-01-01T00:00:00-02:00"
))
assert(options.shareCredentialsVersion == Some(1))
assert(options.`type` == Some("bearer_token"))
assert(options.endpoint == Some("foo"))
assert(options.tokenEndpoint == Some("bar"))
assert(options.clientId == Some("abc"))
assert(options.clientSecret == Some("xyz"))
assert(options.scope == Some("testScope"))
assert(options.bearerToken == Some("bar"))
assert(options.expirationTime == Some("2022-01-01T02:00:00Z"))

assert(options.shareCredentialsOptions.get(
DeltaSharingOptions.PROFILE_SHARE_CREDENTIALS_VERSION) == Some("1"))
assert(options.shareCredentialsOptions.get(
DeltaSharingOptions.PROFILE_TYPE) == Some("bearer_token"))
assert(options.shareCredentialsOptions.get(
DeltaSharingOptions.PROFILE_ENDPOINT) == Some("foo"))
assert(options.shareCredentialsOptions.get(
DeltaSharingOptions.PROFILE_TOKEN_ENDPOINT) == Some("bar"))
assert(options.shareCredentialsOptions.get(
DeltaSharingOptions.PROFILE_CLIENT_ID) == Some("abc"))
assert(options.shareCredentialsOptions.get(
DeltaSharingOptions.PROFILE_CLIENT_SECRET) == Some("xyz"))
assert(options.shareCredentialsOptions.get(
DeltaSharingOptions.PROFILE_SCOPE) == Some("testScope"))
assert(options.shareCredentialsOptions.get(
DeltaSharingOptions.PROFILE_BEARER_TOKEN) == Some("bar"))
assert(options.shareCredentialsOptions.get(
DeltaSharingOptions.PROFILE_EXPIRATION_TIME) == Some("2022-01-01T02:00:00Z"))

}

Expand Down

0 comments on commit 961b66a

Please sign in to comment.