-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cosmos Spark End to End Integration Test against Cosmos Emulator runs in CI #17952
Merged
moderakh
merged 8 commits into
Azure:feature/cosmos/spark30
from
moderakh:users/moderakh/20201101-spark
Dec 8, 2020
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
74c8cf3
more code comments
moderakh 23abec5
Merge branch 'feature/cosmos/spark30' into users/moderakh/20201101-spark
moderakh ad763b6
spark end to end test
moderakh 382619d
added missing files
moderakh ffbcd34
undid an unrelated change
moderakh ce5f0dd
cleanup
moderakh 5ea9611
removed intentionally failing test
moderakh 48830b5
updated comment
moderakh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
16 changes: 16 additions & 0 deletions
16
...s/azure-cosmos-spark_3-0_2-12/src/test/scala/com/azure/cosmos/spark/IntegrationSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
// Copyright (c) Microsoft Corporation. All rights reserved. | ||
// Licensed under the MIT License. | ||
|
||
package com.azure.cosmos.spark | ||
|
||
import org.scalatest.flatspec.AnyFlatSpec | ||
import org.scalatest.matchers.should.Matchers | ||
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Inside, Inspectors, OptionValues} | ||
|
||
abstract class IntegrationSpec extends AnyFlatSpec | ||
with BeforeAndAfterAll | ||
with BeforeAndAfterEach | ||
with Matchers | ||
with OptionValues | ||
with Inside | ||
with Inspectors |
9 changes: 9 additions & 0 deletions
9
...-cosmos-spark_3-0_2-12/src/test/scala/com/azure/cosmos/spark/RequiresCosmosEndpoint.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
// Copyright (c) Microsoft Corporation. All rights reserved. | ||
// Licensed under the MIT License. | ||
|
||
package com.azure.cosmos.spark | ||
|
||
import org.scalatest.Tag | ||
|
||
// Used to identify integration tests which require cosmos db endpoint | ||
object RequiresCosmosEndpoint extends Tag("requiresCosmosEndpoint") |
90 changes: 90 additions & 0 deletions
90
...azure-cosmos-spark_3-0_2-12/src/test/scala/com/azure/cosmos/spark/SparkE2EWriteSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
// Copyright (c) Microsoft Corporation. All rights reserved. | ||
// Licensed under the MIT License. | ||
package com.azure.cosmos.spark | ||
|
||
import java.util.UUID | ||
|
||
import com.azure.cosmos.CosmosClientBuilder | ||
import com.azure.cosmos.implementation.TestConfigurations | ||
import com.fasterxml.jackson.databind.node.ObjectNode | ||
import org.apache.spark.sql.SparkSession | ||
import org.assertj.core.api.Assertions.assertThat | ||
// scalastyle:off underscore.import | ||
import scala.collection.JavaConverters._ | ||
// scalastyle:on underscore.import | ||
|
||
|
||
// TODO moderakh rely on the shared database/container for the tests to avoid creating many | ||
// TODO moderakh we need to clean up databases after creation. | ||
// TODO use facility from V4 SDk? | ||
// TODO do proper clean up for spark session, client, etc | ||
|
||
class SparkE2EWriteSpec extends IntegrationSpec { | ||
//scalastyle:off multiple.string.literals | ||
//scalastyle:off magic.number | ||
|
||
// TODO: moderakh should we tag tests at the test class level or test method level? | ||
"basic dataframe" can "write to cosmos" taggedAs (RequiresCosmosEndpoint) in { | ||
val cosmosEndpoint = TestConfigurations.HOST | ||
val cosmosMasterKey = TestConfigurations.MASTER_KEY | ||
val cosmosDatabase = "testDB" | ||
val cosmosContainer = UUID.randomUUID().toString | ||
|
||
val client = new CosmosClientBuilder() | ||
.endpoint(cosmosEndpoint) | ||
.key(cosmosMasterKey) | ||
.buildAsyncClient() | ||
|
||
client.createDatabaseIfNotExists(cosmosDatabase).block() | ||
client.getDatabase(cosmosDatabase).createContainerIfNotExists(cosmosContainer, "/id").block() | ||
|
||
val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint, | ||
"spark.cosmos.accountKey" -> cosmosMasterKey, | ||
"spark.cosmos.database" -> cosmosDatabase, | ||
"spark.cosmos.container" -> cosmosContainer | ||
) | ||
|
||
// TODO: moderakh do we need to recreate spark for each test or should we use a common instance? | ||
val spark = SparkSession.builder() | ||
.appName("spark connector sample") | ||
.master("local") | ||
.getOrCreate() | ||
|
||
// scalastyle:off underscore.import | ||
// scalastyle:off import.grouping | ||
import spark.implicits._ | ||
// scalastyle:on underscore.import | ||
// scalastyle:on import.grouping | ||
|
||
val df = Seq( | ||
(299792458, "speed of light") | ||
).toDF("number", "word") | ||
df.printSchema() | ||
|
||
df.write.format("cosmos.items").mode("append").options(cfg).save() | ||
|
||
// verify data is written | ||
|
||
// TODO: moderakh note unless if we use an account with strong consistency there is no guarantee | ||
// that the write by spark is visible by the client query | ||
// wait for a second to allow replication is completed. | ||
Thread.sleep(1000) | ||
|
||
val results = client.getDatabase(cosmosDatabase).getContainer(cosmosContainer) | ||
.queryItems("SELECT * FROM r", classOf[ObjectNode]) | ||
.toIterable | ||
.asScala | ||
.toArray | ||
|
||
assertThat(results).hasSize(1) | ||
assertThat(results(0).get("number").asInt()).isEqualTo(299792458) | ||
assertThat(results(0).get("word").asText()).isEqualTo("speed of light") | ||
|
||
// TODO: moderakh develop the proper pattern for proper resource cleanup after test | ||
client.close() | ||
spark.close() | ||
} | ||
|
||
//scalastyle:on magic.number | ||
//scalastyle:on multiple.string.literals | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point - I think it might be useful to see whether we can make that decision based on "avg." document size? Like < 1 KB don't push down pruning - but for larger documents do it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not blocking of course...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion. good idea. I will look into this.