Skip to content

Commit

Permalink
Improve integration tests time and stability
Browse files Browse the repository at this point in the history
- Combine the integration tests into a tests suite
- Start the Docker container with Cartridge once per test suite
- Update testcontainers to an unreleased version with fixed reusing of
the container images
  • Loading branch information
akudiyar committed Jul 3, 2022
1 parent e026d59 commit a86d8f7
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 95 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,8 @@ codereview.rc

# Test Cartridge cluster
.rocks

# Hive
metastore_db/
derby.log
spark-warehouse/
2 changes: 1 addition & 1 deletion build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.6.2
sbt.version=1.7.0-M3
26 changes: 15 additions & 11 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ ThisBuild / developers := List(
ThisBuild / scalaVersion := scala211

val commonDependencies = Seq(
"io.tarantool" % "cartridge-driver" % "0.8.0",
"junit" % "junit" % "4.12" % Test,
"com.github.sbt" % "junit-interface" % "0.12" % Test,
"org.testcontainers" % "testcontainers" % "1.17.0" % Test,
"io.tarantool" % "testcontainers-java-tarantool" % "0.5.0" % Test,
"org.scalatest" %% "scalatest" % "3.2.9" % Test,
"org.scalamock" %% "scalamock" % "5.1.0" % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % "0.39.5" % Test,
"ch.qos.logback" % "logback-classic" % "1.2.5" % Test,
"org.apache.derby" % "derby" % "10.11.1.1" % Test
"io.tarantool" % "cartridge-driver" % "0.8.0",
"junit" % "junit" % "4.12" % Test,
"com.github.sbt" % "junit-interface" % "0.12" % Test,
// TODO: Fix when testcontainers-java:1.17.3 is released
"com.github.testcontainers" % "testcontainers-java" % "f1bb7f10" % Test,
"io.tarantool" % "testcontainers-java-tarantool" % "0.5.0" % Test,
"org.scalatest" %% "scalatest" % "3.2.9" % Test,
"org.scalamock" %% "scalamock" % "5.1.0" % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % "0.39.5" % Test,
"ch.qos.logback" % "logback-classic" % "1.2.5" % Test,
"org.apache.derby" % "derby" % "10.11.1.1" % Test
)

lazy val root = (project in file("."))
Expand Down Expand Up @@ -121,7 +122,10 @@ lazy val root = (project in file("."))
)

// Repositories
Global / resolvers += Resolver.mavenLocal
Global / resolvers ++= Seq(
Resolver.mavenLocal,
"jitpack".at("https://jitpack.io") // TODO: Remove when testcontainers-java:1.17.3 is released
)

// This is a w/a for IJ IDEA not indexing the libraries for sbt 1.3.0+
ThisBuild / useCoursier := false
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.5.2
sbt.version = 1.7.0-M3
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.tarantool.spark.connector.integration

import org.apache.spark.sql.SQLImplicits
import org.scalatest._

class IntegrationTests
extends Suites(
new TarantoolConnectionSpec,
new TarantoolSparkReadClusterTest,
new TarantoolSparkWriteClusterTest
)
with BeforeAndAfterAll {
protected lazy val sqlImplicits: SQLImplicits = SharedSparkContext.spark.implicits

override def beforeAll(): Unit = {
super.beforeAll()
SharedSparkContext.setup()
}

override def afterAll(): Unit = {
super.afterAll()
SharedSparkContext.teardown()
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,36 @@
package io.tarantool.spark.connector.integration

import io.tarantool.spark.connector.containers.TarantoolCartridgeContainer
import io.tarantool.spark.connector.Logging
import org.apache.spark.sql.{SQLImplicits, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfterAll, Suite}
import scala.reflect.io.Directory

import java.io.File
import java.nio.file.Files
import java.util.concurrent.atomic.AtomicReference
import org.junit.rules.TemporaryFolder

/** Shares a local `SparkContext` between all tests cases */
trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
/** Shared Docker contriner and Spark instance between all tests cases */
object SharedSparkContext extends Logging {

val container: TarantoolCartridgeContainer = new TarantoolCartridgeContainer(
directoryBinding = "cartridge",
instancesFile = "cartridge/instances.yml",
topologyConfigurationFile = "cartridge/topology.lua",
routerPassword = "testapp-cluster-cookie"
)

private val sparkSession: AtomicReference[SparkSession] = new AtomicReference[SparkSession]()
private val master = "local"
private val appName = "tarantool-spark-test"
private lazy val warehouseLocation = Files.createTempDirectory("spark-wirehouse").toFile

override def beforeAll(): Unit = {
super.beforeAll()
def setup(): Unit =
container.start()

def setupSpark(): Unit =
if (sparkSession.get() == null) {
sparkSession.compareAndSet(
null,
Expand All @@ -32,23 +40,26 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
).getOrCreate()
)
}
}

def configureSparkSession(
private def configureSparkSession(
sessionBuilder: SparkSession.Builder,
conf: SparkConf
): SparkSession.Builder = {
sessionBuilder.config(conf).enableHiveSupport()
): SparkSession.Builder =
sessionBuilder
}
.config(conf)
.config("spark.sql.warehouse.dir", warehouseLocation.getAbsolutePath())
.config(
"javax.jdo.option.ConnectionURL",
"jdbc:derby:;databaseName=%s;create=true".format(warehouseLocation.getAbsolutePath())
)
.enableHiveSupport()

def confWithTarantoolProperties(routerPort: Int): SparkConf = {
private def confWithTarantoolProperties(routerPort: Int): SparkConf = {
val _conf = new SparkConf(false)
.setMaster(master)
.setAppName(appName)
_conf.set("tarantool.username", "admin")
_conf.set("tarantool.password", "testapp-cluster-cookie")

_conf.set("tarantool.hosts", "127.0.0.1:" + routerPort)

_conf
Expand All @@ -60,17 +71,17 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
def spark: SparkSession =
sparkSession.get()

protected lazy val sqlImplicits: SQLImplicits = spark.implicits

override def afterAll(): Unit = {
super.afterAll()
try {
val scRef = sparkSession.get()
if (sparkSession.compareAndSet(scRef, null)) {
scRef.stop()
}
} finally {
container.stop()
def teardownSpark(): Unit = {
val scRef = sparkSession.get()
if (sparkSession.compareAndSet(scRef, null)) {
scRef.stop()
}
cleanupTempDirectory()
}

def teardown(): Unit =
container.stop()

def cleanupTempDirectory(): Unit =
Directory(warehouseLocation).deleteRecursively()
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
package io.tarantool.spark.connector.connection
package io.tarantool.spark.connector.integration

import io.tarantool.spark.connector.config.TarantoolConfig
import io.tarantool.spark.connector.integration.SharedSparkContext
import io.tarantool.spark.connector.connection.TarantoolConnection
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class TarantoolConnectionSpec extends AnyFlatSpec with Matchers with SharedSparkContext {
/**
* @author Alexey Kuzin
*/
@org.scalatest.DoNotDiscover
class TarantoolConnectionSpec
extends AnyFlatSpec
with Matchers
with TarantoolSparkClusterTestSuite {

"Client in Connection" should " be initialized only once" in {
val conn1 = TarantoolConnection()
val conn2 = TarantoolConnection()

val conf: TarantoolConfig = TarantoolConfig(sc.getConf)
val conf: TarantoolConfig = TarantoolConfig(SharedSparkContext.sc.getConf)

conn1.client(conf) should equal(conn1.client(conf))
conn1.client(conf) should equal(conn2.client(conf))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.tarantool.spark.connector.integration

import org.scalatest.BeforeAndAfterEach
import org.scalatest.Suite

/**
* @author Alexey Kuzin
*/
trait TarantoolSparkClusterTestSuite extends BeforeAndAfterEach { this: Suite =>

override protected def beforeEach(): Unit = {
super.beforeEach()
SharedSparkContext.setupSpark()
}

override protected def afterEach(): Unit = {
super.afterEach()
SharedSparkContext.teardownSpark()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import org.scalatest.matchers.should.Matchers

import scala.collection.JavaConverters.seqAsJavaListConverter

class TarantoolSparkReadClusterClientTest
/**
* @author Alexey Kuzin
*/
@org.scalatest.DoNotDiscover
class TarantoolSparkReadClusterTest
extends AnyFunSuite
with Matchers
with BeforeAndAfterAll
with BeforeAndAfterEach
with SharedSparkContext {
with TarantoolSparkClusterTestSuite {

//space format:
// s = box.schema.space.create('_spark_test_space')
Expand All @@ -29,18 +31,18 @@ class TarantoolSparkReadClusterClientTest

override protected def beforeEach(): Unit = {
super.beforeEach()
container.executeScript("test_setup.lua").get
SharedSparkContext.container.executeScript("test_setup.lua").get
}

override protected def afterEach(): Unit = {
container.executeScript("test_teardown.lua").get
SharedSparkContext.container.executeScript("test_teardown.lua").get
super.afterEach()
}

test("Create connection") {
val tarantoolConnection = TarantoolConnection()
val tarantoolClient = Option(
tarantoolConnection.client(TarantoolConfig(sc.getConf))
tarantoolConnection.client(TarantoolConfig(SharedSparkContext.sc.getConf))
)
tarantoolClient should not be Option.empty
val spaceHolder = tarantoolClient.get.metadata.getSpaceByName(SPACE_NAME)
Expand All @@ -50,12 +52,12 @@ class TarantoolSparkReadClusterClientTest

test("Load the whole space") {
val rdd: Array[TarantoolTuple] =
sc.tarantoolSpace("test_space", Conditions.any()).collect()
SharedSparkContext.sc.tarantoolSpace("test_space", Conditions.any()).collect()
rdd.length > 0 should equal(true)
}

test("Load the whole space into a DataFrame") {
val df = spark.read
val df = SharedSparkContext.spark.read
.format("org.apache.spark.sql.tarantool")
.option("tarantool.space", "test_space")
.load()
Expand All @@ -71,7 +73,8 @@ class TarantoolSparkReadClusterClientTest
.indexGreaterThan("id", List(1).asJava)
.withLimit(2)
.startAfter(startTuple)
val rdd: Array[TarantoolTuple] = sc.tarantoolSpace("test_space", cond).collect()
val rdd: Array[TarantoolTuple] =
SharedSparkContext.sc.tarantoolSpace("test_space", cond).collect()

rdd.length should equal(2)
rdd.apply(0).getInteger("id") should equal(2)
Expand All @@ -89,13 +92,13 @@ class TarantoolSparkReadClusterClientTest
book
}
val rdd: Array[Book] =
sc.tarantoolSpace[Book]("test_space", Conditions.any()).collect()
SharedSparkContext.sc.tarantoolSpace[Book]("test_space", Conditions.any()).collect()
rdd.length > 0 should equal(true)
rdd.find(b => b.year == 1605) should not be None
}

test("Load the whole space into a Dataset with schema auto-determination") {
val df = spark.read
val df = SharedSparkContext.spark.read
.format("org.apache.spark.sql.tarantool")
.option("tarantool.space", "test_space")
.load()
Expand Down
Loading

0 comments on commit a86d8f7

Please sign in to comment.