Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cosmos Spark End to End Integration Test against Cosmos Emulator runs in CI #17952

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions eng/pipelines/templates/stages/cosmos-sdk-client.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,14 @@ stages:
PROTOCOLS: '["Tcp"]'
DESIRED_CONSISTENCIES: '["Session"]'
AdditionalArgs: '-DargLine="-DACCOUNT_HOST=https://localhost:8081/"'
Spark_Integration_Tests_Java8:
OSVmImage: 'windows-2019'
JavaTestVersion: '1.8'
ProfileFlag: '-PsparkE2E'
DisplayName: 'Spark Integration Tests targeting Cosmos Emulator'
PROTOCOLS: '["Tcp"]'
DESIRED_CONSISTENCIES: '["Session"]'
AdditionalArgs: '-DargLine="-DACCOUNT_HOST=https://localhost:8081/"'

TestStepMavenInputs:
goals: clean verify
Expand Down
162 changes: 122 additions & 40 deletions sdk/cosmos/azure-cosmos-spark_3-0_2-12/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,46 +166,6 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-surefire-plugin;external_dependency} -->
<configuration>
<includes>
<include>**/*.*</include>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
<include>**/*Spec.*</include>
</includes>
<skipTests>true</skipTests>
</configuration>
</plugin>
<!-- To use the ScalaTest Maven plugin, SureFire needs to be disabled and ScalaTest enabled -->
<!-- enable scalatest -->
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>2.0.2</version> <!-- {x-version-update;cosmos_org.scalatest:scalatest-maven-plugin;external_dependency} -->
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>SparkTestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
<execution>
<id>scala-test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-enforcer-plugin</artifactId>
Expand Down Expand Up @@ -304,10 +264,132 @@
<skip>true</skip>
</configuration>
</plugin>

<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>0.8.5</version> <!-- {x-version-update;org.jacoco:jacoco-maven-plugin;external_dependency} -->
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>unit</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-surefire-plugin;external_dependency} -->
<configuration>
<includes>
<include>**/*.*</include>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
<include>**/*Spec.*</include>
</includes>
<skipTests>true</skipTests>
</configuration>
</plugin>
<!-- To use the ScalaTest Maven plugin, SureFire needs to be disabled and ScalaTest enabled -->
<!-- enable scalatest -->
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>2.0.2</version> <!-- {x-version-update;cosmos_org.scalatest:scalatest-maven-plugin;external_dependency} -->
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>SparkTestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<configuration>
<tagsToExclude>requiresCosmosEndpoint</tagsToExclude>
</configuration>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
<execution>
<configuration>
<tagsToExclude>requiresCosmosEndpoint</tagsToExclude>
</configuration>
<id>scala-test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<!-- integration tests, requires Cosmos DB Emulator Endpoint -->
<id>sparkE2E</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M3</version> <!-- {x-version-update;org.apache.maven.plugins:maven-surefire-plugin;external_dependency} -->
<configuration>
<includes>
<include>**/*.*</include>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
<include>**/*Spec.*</include>
</includes>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<version>2.0.2</version> <!-- {x-version-update;cosmos_org.scalatest:scalatest-maven-plugin;external_dependency} -->
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>SparkTestSuite.txt</filereports>
</configuration>
<executions>
<execution>
<configuration>
<tagsToInclude>requiresCosmosEndpoint</tagsToInclude>
</configuration>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
<execution>
<configuration>
<tagsToInclude>requiresCosmosEndpoint</tagsToInclude>
</configuration>
<id>scala-test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>


<!-- Library cannot build for Java 10 and below -->
<profile>
<id>java8</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,21 @@ case class CosmosScanBuilder(config: CaseInsensitiveStringMap)
CosmosScan(config.asScala.toMap, this.processedPredicates.get.cosmosParametrizedQuery)
}

/**
* Applies column pruning w.r.t. the given requiredSchema.
*
* Implementation should try its best to prune the unnecessary columns or nested fields, but it's
* also OK to do the pruning partially, e.g., a data source may not be able to prune nested
* fields, and only prune top-level columns.
*
* Note that, {@link Scan# readSchema ( )} implementation should take care of the column
* pruning applied here.
*/
override def pruneColumns(requiredSchema: StructType): Unit = {
// TODO moderakh add projection to the query
// TODO moderakh: we need to decide whether do a push down or not on the projection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point - I think it might be useful to see whether we can make that decision based on "avg." document size? Like < 1 KB don't push down pruning - but for larger documents do it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking of course...

Copy link
Contributor Author

@moderakh moderakh Dec 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. good idea. I will look into this.

// spark will do column pruning on the returned data.
// pushing down projection to cosmos has tradeoffs:
// - it increases consumed RU in cosmos query engine
// - it decrease the networking layer latency
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ case class FilterAnalyzer() {
case _: Filter =>
// the unsupported filter will be applied by the spark platform itself.
// TODO: moderakh how count, avg, min, max are pushed down? or orderby?
// are they provided in the projected push down columns?
// spark 3.0 does not support aggregate push downs, but spark 3.1 will
// https://issues.apache.org/jira/browse/SPARK-22390
// https://github.com/apache/spark/pull/29695/files
false
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.spark

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Inside, Inspectors, OptionValues}

abstract class IntegrationSpec extends AnyFlatSpec
with BeforeAndAfterAll
with BeforeAndAfterEach
with Matchers
with OptionValues
with Inside
with Inspectors
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.spark

import org.scalatest.Tag

// Used to identify integration tests which require cosmos db endpoint
object RequiresCosmosEndpoint extends Tag("requiresCosmosEndpoint")
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.spark

import java.util.UUID

import com.azure.cosmos.CosmosClientBuilder
import com.azure.cosmos.implementation.TestConfigurations
import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.spark.sql.SparkSession
import org.assertj.core.api.Assertions.assertThat
// scalastyle:off underscore.import
import scala.collection.JavaConverters._
// scalastyle:on underscore.import


// TODO moderakh rely on the shared database/container for the tests to avoid creating many
// TODO moderakh we need to clean up databases after creation.
// TODO use facility from V4 SDk?
// TODO do proper clean up for spark session, client, etc

class SparkE2EWriteSpec extends IntegrationSpec {
//scalastyle:off multiple.string.literals
//scalastyle:off magic.number

// TODO: moderakh should we tag tests at the test class level or test method level?
"basic dataframe" can "write to cosmos" taggedAs (RequiresCosmosEndpoint) in {
val cosmosEndpoint = TestConfigurations.HOST
val cosmosMasterKey = TestConfigurations.MASTER_KEY
val cosmosDatabase = "testDB"
val cosmosContainer = UUID.randomUUID().toString

val client = new CosmosClientBuilder()
.endpoint(cosmosEndpoint)
.key(cosmosMasterKey)
.buildAsyncClient()

client.createDatabaseIfNotExists(cosmosDatabase).block()
client.getDatabase(cosmosDatabase).createContainerIfNotExists(cosmosContainer, "/id").block()

val cfg = Map("spark.cosmos.accountEndpoint" -> cosmosEndpoint,
"spark.cosmos.accountKey" -> cosmosMasterKey,
"spark.cosmos.database" -> cosmosDatabase,
"spark.cosmos.container" -> cosmosContainer
)

// TODO: moderakh do we need to recreate spark for each test or should we use a common instance?
val spark = SparkSession.builder()
.appName("spark connector sample")
.master("local")
.getOrCreate()

// scalastyle:off underscore.import
// scalastyle:off import.grouping
import spark.implicits._
// scalastyle:on underscore.import
// scalastyle:on import.grouping

val df = Seq(
(299792458, "speed of light")
).toDF("number", "word")
df.printSchema()

df.write.format("cosmos.items").mode("append").options(cfg).save()

// verify data is written

// TODO: moderakh note unless if we use an account with strong consistency there is no guarantee
// that the write by spark is visible by the client query
// wait for a second to allow replication is completed.
Thread.sleep(1000)

val results = client.getDatabase(cosmosDatabase).getContainer(cosmosContainer)
.queryItems("SELECT * FROM r", classOf[ObjectNode])
.toIterable
.asScala
.toArray

assertThat(results).hasSize(1)
assertThat(results(0).get("number").asInt()).isEqualTo(299792458)
assertThat(results(0).get("word").asText()).isEqualTo("speed of light")

// TODO: moderakh develop the proper pattern for proper resource cleanup after test
client.close()
spark.close()
}

//scalastyle:on magic.number
//scalastyle:on multiple.string.literals
}