Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Spark Tests Framework #18551

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/cosmos/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

*.log
.gitignore
moderakh marked this conversation as resolved.
Show resolved Hide resolved

metastore_db/*
spark-warehouse/*
5 changes: 5 additions & 0 deletions sdk/cosmos/azure-cosmos-spark_3-0_2-12/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*.log
.gitignore
moderakh marked this conversation as resolved.
Show resolved Hide resolved

metastore_db/*
spark-warehouse/*
35 changes: 35 additions & 0 deletions sdk/cosmos/azure-cosmos-spark_3-0_2-12/Dev-README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Dev Azure Cosmos DB OLTP Spark connector client library for Java

### How to run unit

To run the tests of Spark connector without running the SDK tests you need to install azure-cosmos first
```bash
mvn -e -DskipTests -Dgpg.skip -Dmaven.javadoc.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos -am clean install
```

To run unit tests:
```
mvn -e -Dgpg.skip -Dmaven.javadoc.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-spark_3-0_2-12 test package -Punit
```

To run integration tests (requires Cosmos DB endpoint)

Create the file ~/cosmos-v4.properties with the following content (modify to match your cosmos endpoint):

```
ACCOUNT_HOST=https://192.168.1.51:8081
moderakh marked this conversation as resolved.
Show resolved Hide resolved
ACCOUNT_KEY=C2y6yDjf5/R+ob0N8A7Cgv30VRDJIWEHLM+4QDU5DE2nQ9nDuVTqobD4b8mGGyPMbIZnqyMsEcaGQy67XIw/Jw==
```

run the following command to run end to end integration tests:

```bash
mvn -e -Dgpg.skip -Dmaven.javadoc.skip=true -Dspotbugs.skip=true -Dcheckstyle.skip=true -Drevapi.skip=true -pl ,azure-cosmos-spark_3-0_2-12 test package -PsparkE2E
```

How to run style check:
```bash
mvn -e -Dgpg.skip -DskipTests -Dmaven.javadoc.skip=true -Dspotbugs.skip=false -Dcheckstyle.skip=false -Drevapi.skip=true -pl ,azure-cosmos-spark_3-0_2-12 -am clean package
```


Original file line number Diff line number Diff line change
Expand Up @@ -3,160 +3,94 @@
package com.azure.cosmos.spark

import com.azure.cosmos.implementation.TestConfigurations
import com.azure.cosmos.{CosmosAsyncClient, CosmosClientBuilder, CosmosException}
import org.apache.commons.lang3.RandomStringUtils
import org.apache.spark.sql.SparkSession
import org.assertj.core.api.Assertions.assertThat
// scalastyle:off underscore.import
import scala.collection.JavaConverters._
// scalastyle:on underscore.import

// TODO moderakh do proper clean up for spark session, client, etc
// TODO: moderakh should we tag tests at the test class level or test method level?
// TODO: moderakh do we need to recreate spark for each test or should we use a common instance?
// TODO: moderakh rely on the shared database/container for the tests to avoid creating many
// TODO: moderakh develop the proper pattern for proper resource cleanup after test
// we need to clean up databases after creation.

class CosmosCatalogSpec extends IntegrationSpec {
class CosmosCatalogSpec extends IntegrationSpec with CosmosClient {
//scalastyle:off multiple.string.literals
//scalastyle:off magic.number

it should "create a database with shared throughput" taggedAs (RequiresCosmosEndpoint) in {
var spark : SparkSession = _

override def beforeAll(): Unit = {
super.beforeAll()
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val client = new CosmosClientBuilder()
.endpoint(cosmosEndpoint)
.key(cosmosMasterKey)
.buildAsyncClient()

val spark = SparkSession.builder()
spark = SparkSession.builder()
.appName("spark connector sample")
.master("local")
.enableHiveSupport()
.getOrCreate()

spark.conf.set(s"spark.sql.catalog.testCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set(s"spark.sql.catalog.testCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set(s"spark.sql.catalog.testCatalog.spark.cosmos.accountKey", cosmosMasterKey)
}

val databaseName = RandomStringUtils.randomAlphabetic(5)

spark.sql(s"CREATE DATABASE testCatalog.${databaseName} WITH DBPROPERTIES ('manualThroughput' = '1000');")

client.getDatabase(databaseName).read().block()
val throughput = client.getDatabase(databaseName).readThroughput().block()
assertThat(throughput.getProperties.getManualThroughput).isEqualTo(1000)

client.close()
spark.close()
override def afterAll(): Unit = {
try spark.close()
finally super.afterAll()
}

it should "drops a database" taggedAs (RequiresCosmosEndpoint) in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
"Cosmos Catalog" can "create a database with shared throughput" taggedAs (RequiresCosmosEndpoint) in {
val databaseName = getAutoCleanableDatabaseName()

val client = new CosmosClientBuilder()
.endpoint(cosmosEndpoint)
.key(cosmosMasterKey)
.buildAsyncClient()
spark.sql(s"CREATE DATABASE testCatalog.${databaseName} WITH DBPROPERTIES ('manualThroughput' = '1000');")

val spark = SparkSession.builder()
.appName("spark connector sample")
.master("local")
.getOrCreate()
cosmosClient.getDatabase(databaseName).read().block()
val throughput = cosmosClient.getDatabase(databaseName).readThroughput().block()

spark.conf.set(s"spark.sql.catalog.testCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set(s"spark.sql.catalog.testCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set(s"spark.sql.catalog.testCatalog.spark.cosmos.accountKey", cosmosMasterKey)
throughput.getProperties.getManualThroughput shouldEqual 1000
}

val databaseName = RandomStringUtils.randomAlphabetic(6)
assertThat(databaseExists(client, databaseName)).isEqualTo(false)
it can "drops a database" taggedAs (RequiresCosmosEndpoint) in {
val databaseName = getAutoCleanableDatabaseName()
spark.catalog.databaseExists(databaseName) shouldEqual false

createDatabase(spark, databaseName)
assertThat(databaseExists(client, databaseName)).isEqualTo(true)
databaseExists(databaseName) shouldEqual true

dropDatabase(spark, databaseName)
assertThat(spark.catalog.databaseExists(databaseName)).isEqualTo(false)

client.close()
spark.close()
spark.catalog.databaseExists(databaseName) shouldEqual false
}

it can "create a table with defaults" taggedAs (RequiresCosmosEndpoint) in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val client = new CosmosClientBuilder()
.endpoint(cosmosEndpoint)
.key(cosmosMasterKey)
.buildAsyncClient()

val spark = SparkSession.builder()
.appName("spark connector sample")
.master("local")
.enableHiveSupport()
.getOrCreate()

spark.conf.set(s"spark.sql.catalog.cosmoscatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set(s"spark.sql.catalog.cosmoscatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set(s"spark.sql.catalog.cosmoscatalog.spark.cosmos.accountKey", cosmosMasterKey)

val databaseName = RandomStringUtils.randomAlphabetic(5).toLowerCase
val databaseName = getAutoCleanableDatabaseName()
val containerName = RandomStringUtils.randomAlphabetic(6).toLowerCase + System.currentTimeMillis()
cleanupDatabaseLater(databaseName)

spark.sql(s"CREATE DATABASE cosmoscatalog.${databaseName};")
spark.sql(s"CREATE TABLE cosmoscatalog.${databaseName}.${containerName} (word STRING, number INT) using cosmos.items;")
spark.sql(s"CREATE DATABASE testCatalog.${databaseName};")
spark.sql(s"CREATE TABLE testCatalog.${databaseName}.${containerName} (word STRING, number INT) using cosmos.items;")

val containerProperties = client.getDatabase(databaseName).getContainer(containerName).read().block().getProperties
val containerProperties = cosmosClient.getDatabase(databaseName).getContainer(containerName).read().block().getProperties

// verify default partition key path is used
containerProperties.getPartitionKeyDefinition.getPaths.asScala.toArray should equal(Array("/id"))

// validate throughput

val throughput = client.getDatabase(databaseName).getContainer(containerName).readThroughput().block().getProperties
val throughput = cosmosClient.getDatabase(databaseName).getContainer(containerName).readThroughput().block().getProperties
throughput.getManualThroughput shouldEqual 400

client.close()
spark.close()
}

it should "create a table with customized properties" taggedAs (RequiresCosmosEndpoint) in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY

val client = new CosmosClientBuilder()
.endpoint(cosmosEndpoint)
.key(cosmosMasterKey)
.buildAsyncClient()

val spark = SparkSession.builder()
.appName("spark connector sample")
.master("local")
.enableHiveSupport()
.getOrCreate()

spark.conf.set(s"spark.sql.catalog.cosmoscatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set(s"spark.sql.catalog.cosmoscatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set(s"spark.sql.catalog.cosmoscatalog.spark.cosmos.accountKey", cosmosMasterKey)

val databaseName = RandomStringUtils.randomAlphabetic(5).toLowerCase
it can "create a table with customized properties" taggedAs (RequiresCosmosEndpoint) in {
val databaseName = getAutoCleanableDatabaseName()
val containerName = RandomStringUtils.randomAlphabetic(6).toLowerCase + System.currentTimeMillis()

spark.sql(s"CREATE DATABASE cosmoscatalog.${databaseName};")
spark.sql(s"CREATE TABLE cosmoscatalog.${databaseName}.${containerName} (word STRING, number INT) using cosmos.items " +
spark.sql(s"CREATE DATABASE testCatalog.${databaseName};")
spark.sql(s"CREATE TABLE testCatalog.${databaseName}.${containerName} (word STRING, number INT) using cosmos.items " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated - but I really love that you have wired up the metadata changes already - I am sure Spark developers will love this feature - don't know how many customers I have seen that had taken a dependency to some crappy very old Python SDK just to be able to create/modify containers etc.

s"TBLPROPERTIES(partitionKeyPath = '/mypk', manualThroughput = '1100')")

val containerProperties = client.getDatabase(databaseName).getContainer(containerName).read().block().getProperties
val containerProperties = cosmosClient.getDatabase(databaseName).getContainer(containerName).read().block().getProperties
containerProperties.getPartitionKeyDefinition.getPaths.asScala.toArray should equal(Array("/mypk"))

// validate throughput
val throughput = client.getDatabase(databaseName).getContainer(containerName).readThroughput().block().getProperties
val throughput = cosmosClient.getDatabase(databaseName).getContainer(containerName).readThroughput().block().getProperties
throughput.getManualThroughput shouldEqual 1100

client.close()
spark.close()
}

private def createDatabase(spark: SparkSession, databaseName: String) = {
Expand All @@ -167,14 +101,6 @@ class CosmosCatalogSpec extends IntegrationSpec {
spark.sql(s"DROP DATABASE testCatalog.${databaseName};")
}

private def databaseExists(client: CosmosAsyncClient, databaseName: String) = {
try {
client.getDatabase(databaseName).read().block()
true
} catch {
case e: CosmosException if e.getStatusCode == 404 => false
}
}
//scalastyle:on magic.number
//scalastyle:on multiple.string.literals
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@
// Licensed under the MIT License.
package com.azure.cosmos.spark

import org.assertj.core.api.Assertions.assertThat

class CosmosConfigSpec extends UnitSpec {
//scalastyle:off multiple.string.literals

"account endpoint" should "be parsed" in {
"Config Parser" should "parse account credentials" in {
val userConfig = Map(
"spark.cosmos.accountEndpoint" -> "https://localhsot:8081",
"spark.cosmos.accountKey" -> "xyz"
)

val endpointConfig = CosmosAccountConfig.parseCosmosAccountConfig(userConfig)

assertThat(endpointConfig.endpoint).isEqualTo( "https://localhsot:8081")
assertThat(endpointConfig.key).isEqualTo( "xyz")
endpointConfig.endpoint shouldEqual "https://localhsot:8081"
endpointConfig.key shouldEqual "xyz"
}

"account endpoint" should "be validated" in {
it should "validate account endpoint" in {
val userConfig = Map(
"spark.cosmos.accountEndpoint" -> "invalidUrl",
"spark.cosmos.accountKey" -> "xyz"
Expand All @@ -29,13 +27,13 @@ class CosmosConfigSpec extends UnitSpec {
CosmosAccountConfig.parseCosmosAccountConfig(userConfig)
fail("invalid URL")
} catch {
case e: Exception => assertThat(e.getMessage).isEqualTo(
case e: Exception => e.getMessage shouldEqual
"invalid configuration for spark.cosmos.accountEndpoint:invalidUrl." +
" Config description: Cosmos DB Account Endpoint Uri")
" Config description: Cosmos DB Account Endpoint Uri"
}
}

"account endpoint" should "mandatory config" in {
it should "complain if mandatory config is missing" in {
val userConfig = Map(
"spark.cosmos.accountKey" -> "xyz"
)
Expand All @@ -44,9 +42,9 @@ class CosmosConfigSpec extends UnitSpec {
CosmosAccountConfig.parseCosmosAccountConfig(userConfig)
fail("missing URL")
} catch {
case e: Exception => assertThat(e.getMessage).isEqualTo(
case e: Exception => e.getMessage shouldEqual
"mandatory option spark.cosmos.accountEndpoint is missing." +
" Config description: Cosmos DB Account Endpoint Uri")
" Config description: Cosmos DB Account Endpoint Uri"
}
}
//scalastyle:on multiple.string.literals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package com.azure.cosmos.spark
import com.fasterxml.jackson.databind.node.ArrayNode
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{ArrayType, IntegerType, NullType, StringType, StructField, StructType}
import org.assertj.core.api.Assertions.assertThat

class CosmosRowConverterSpec extends UnitSpec {
//scalastyle:off null
Expand All @@ -23,8 +22,8 @@ class CosmosRowConverterSpec extends UnitSpec {
StructType(Seq(StructField(colName1, IntegerType), StructField(colName2, StringType))))

val objectNode = CosmosRowConverter.rowToObjectNode(row)
assertThat(objectNode.get(colName1).asInt()).isEqualTo(colVal1)
assertThat(objectNode.get(colName2).asText()).isEqualTo(colVal2)
objectNode.get(colName1).asInt() shouldEqual colVal1
objectNode.get(colName2).asText() shouldEqual colVal2
}

"null type in spark row" should "translate to null in ObjectNode" in {
Expand All @@ -39,8 +38,8 @@ class CosmosRowConverterSpec extends UnitSpec {
StructType(Seq(StructField(colName1, NullType), StructField(colName2, StringType))))

val objectNode = CosmosRowConverter.rowToObjectNode(row)
assertThat(objectNode.get(colName1).isNull).isTrue
assertThat(objectNode.get(colName2).asText()).isEqualTo(colVal2)
objectNode.get(colName1).isNull shouldBe true
objectNode.get(colName2).asText() shouldEqual colVal2
}

"array in spark row" should "translate to null in ArrayNode" in {
Expand All @@ -53,12 +52,12 @@ class CosmosRowConverterSpec extends UnitSpec {
StructType(Seq(StructField(colName1, ArrayType(StringType, true)), StructField(colName2, StringType))))

val objectNode = CosmosRowConverter.rowToObjectNode(row)
assertThat(objectNode.get(colName1).isArray)
assertThat(objectNode.get(colName1).asInstanceOf[ArrayNode]).hasSize(2)
assertThat(objectNode.get(colName1).asInstanceOf[ArrayNode].get(0).asText()).isEqualTo("arrayElement1")
assertThat(objectNode.get(colName1).asInstanceOf[ArrayNode].get(1).asText()).isEqualTo("arrayElement2")
objectNode.get(colName1).isArray shouldBe true
objectNode.get(colName1).asInstanceOf[ArrayNode] should have size 2
objectNode.get(colName1).asInstanceOf[ArrayNode].get(0).asText() shouldEqual "arrayElement1"
objectNode.get(colName1).asInstanceOf[ArrayNode].get(1).asText() shouldEqual "arrayElement2"

assertThat(objectNode.get(colName2).asText()).isEqualTo(colVal1)
objectNode.get(colName2).asText() shouldEqual colVal1
}
//scalastyle:on null
//scalastyle:on multiple.string.literals
Expand Down
Loading