diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index faff078..97d7df8 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -21,15 +21,15 @@ jobs: matrix: include: - scala: "2.11.12" - tarantool: "2.8" + tarantool: "2.11" router_port: "3301" router_api_port: "8081" - scala: "2.12.16" - tarantool: "2.8" + tarantool: "2.11" router_port: "3331" router_api_port: "8381" - scala: "2.13.10" - tarantool: "2.8" + tarantool: "2.11" router_port: "3361" router_api_port: "8681" steps: diff --git a/build.sbt b/build.sbt index 1fb4d7b..db23352 100644 --- a/build.sbt +++ b/build.sbt @@ -35,16 +35,17 @@ ThisBuild / developers := List( ThisBuild / scalaVersion := scala211 val commonDependencies = Seq( - "io.tarantool" % "cartridge-driver" % "0.10.1", - "junit" % "junit" % "4.12" % Test, - "com.github.sbt" % "junit-interface" % "0.12" % Test, - "org.scalatest" %% "scalatest" % "3.2.14" % Test, + "io.tarantool" % "cartridge-driver" % "0.12.0", + "junit" % "junit" % "4.13.2" % Test, + "com.github.sbt" % "junit-interface" % "0.13.3" % Test, + "org.scalatest" %% "scalatest" % "3.2.16" % Test, "org.scalamock" %% "scalamock" % "5.2.0" % Test, - "com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.12" % Test, - "ch.qos.logback" % "logback-core" % "1.2.5" % Test, - "ch.qos.logback" % "logback-classic" % "1.2.5" % Test, + "com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.17" % Test, + "ch.qos.logback" % "logback-core" % "1.2.12" % Test, + "ch.qos.logback" % "logback-classic" % "1.2.12" % Test, "org.apache.derby" % "derby" % "10.11.1.1" % Test, - "io.tarantool" % "testcontainers-java-tarantool" % "0.5.3" % Test + "io.tarantool" % "testcontainers-java-tarantool" % "1.0.0" % Test, + "org.msgpack" % "msgpack-core" % "0.9.0" % Test ).map( _.exclude("io.netty", "netty-all") .exclude("io.netty", "netty-transport") @@ -116,7 +117,8 @@ lazy val root = (project in file(".")) // Test frameworks options testOptions ++= Seq( Tests.Argument(TestFrameworks.JUnit, "-v"), - Tests.Setup(() => System.setSecurityManager(null)) // SPARK-22918 + Tests.Setup(() => System.setSecurityManager(null)), // SPARK-22918 + Tests.Argument("-oF") ), // Publishing settings publishTo := { diff --git a/src/main/scala/org/apache/spark/sql/tarantool/MapFunctions.scala b/src/main/scala/org/apache/spark/sql/tarantool/MapFunctions.scala index 29eef0b..408cafa 100644 --- a/src/main/scala/org/apache/spark/sql/tarantool/MapFunctions.scala +++ b/src/main/scala/org/apache/spark/sql/tarantool/MapFunctions.scala @@ -19,6 +19,8 @@ import java.lang.{ Short => JShort } import java.util.{ArrayList => JList, HashMap => JMap} +import java.sql.Timestamp +import java.time.Instant import scala.collection.JavaConverters.{mapAsJavaMapConverter, seqAsJavaListConverter} /** @@ -84,6 +86,7 @@ object MapFunctions { case DoubleType => classOf[java.lang.Double] case FloatType => classOf[java.lang.Float] case _: DecimalType => classOf[java.math.BigDecimal] + case _: TimestampType => classOf[java.time.Instant] case mapType: MapType => val keyClass = dataTypeToJavaClass(mapType.keyType) val valueClass = dataTypeToJavaClass(mapType.valueType) @@ -170,6 +173,7 @@ object MapFunctions { case value: Long => value.asInstanceOf[JLong] case value: Float => value.asInstanceOf[JFloat] case value: Double => value.asInstanceOf[JDouble] + case value: Timestamp => value.toInstant().asInstanceOf[Instant] case value: Any => identity(value) } } diff --git a/src/main/scala/org/apache/spark/sql/tarantool/TarantoolSchema.scala b/src/main/scala/org/apache/spark/sql/tarantool/TarantoolSchema.scala index f336de1..cd86c6d 100644 --- a/src/main/scala/org/apache/spark/sql/tarantool/TarantoolSchema.scala +++ b/src/main/scala/org/apache/spark/sql/tarantool/TarantoolSchema.scala @@ -74,6 +74,7 @@ object TarantoolFieldTypes extends Enumeration { val BOOLEAN: TarantoolFieldType = TarantoolFieldType("boolean", DataTypes.BooleanType) val DECIMAL: TarantoolFieldType = TarantoolFieldType("decimal", createDecimalType()) val UUID: TarantoolFieldType = TarantoolFieldType("uuid", DataTypes.StringType) + val DATETIME: TarantoolFieldType = TarantoolFieldType("datetime", DataTypes.TimestampType) val ARRAY: TarantoolFieldType = TarantoolFieldType("array", DataTypes.createArrayType(DataTypes.StringType, true)) diff --git a/src/test/java/io/tarantool/spark/connector/integration/JavaSparkContextFunctionsTest.java b/src/test/java/io/tarantool/spark/connector/integration/JavaSparkContextFunctionsTest.java index 5b361d8..1264aea 100644 --- a/src/test/java/io/tarantool/spark/connector/integration/JavaSparkContextFunctionsTest.java +++ b/src/test/java/io/tarantool/spark/connector/integration/JavaSparkContextFunctionsTest.java @@ -23,7 +23,7 @@ public class JavaSparkContextFunctionsTest extends SharedJavaSparkContext { @Before public void beforeEach() { try { - container.executeScript("test_setup.lua").get(); + container.executeScript("test_setup.lua"); } catch (Exception e) { throw new RuntimeException("Failed to set up test: ", e); } @@ -32,7 +32,7 @@ public void beforeEach() { @After public void afterEach() { try { - container.executeScript("test_teardown.lua").get(); + container.executeScript("test_teardown.lua"); } catch (Exception e) { throw new RuntimeException("Failed to set up test: ", e); } diff --git a/src/test/resources/Dockerfile b/src/test/resources/Dockerfile index 9145d83..4ad4c5f 100644 --- a/src/test/resources/Dockerfile +++ b/src/test/resources/Dockerfile @@ -1,19 +1,13 @@ -FROM tgagor/centos:stream8 AS tarantool-base -ARG TARANTOOL_VERSION=2.8 +ARG TARANTOOL_VERSION=2.11.0 +FROM tarantool/tarantool:${TARANTOOL_VERSION}-centos7 AS cartridge-base + ARG TARANTOOL_SERVER_USER="tarantool" ARG TARANTOOL_SERVER_UID=1000 ARG TARANTOOL_SERVER_GROUP="tarantool" ARG TARANTOOL_SERVER_GID=1000 -ARG TARANTOOL_WORKDIR="/app" -ARG TARANTOOL_RUNDIR="/tmp/run" -ARG TARANTOOL_DATADIR="/tmp/data" -ARG TARANTOOL_INSTANCES_FILE="./instances.yml" -ENV TARANTOOL_WORKDIR=$TARANTOOL_WORKDIR -ENV TARANTOOL_RUNDIR=$TARANTOOL_RUNDIR -ENV TARANTOOL_DATADIR=$TARANTOOL_DATADIR -ENV TARANTOOL_INSTANCES_FILE=$TARANTOOL_INSTANCES_FILE -RUN curl -L https://tarantool.io/installer.sh | VER=$TARANTOOL_VERSION /bin/bash -s -- --repo-only && \ - yum -y install cmake make gcc gcc-c++ git unzip tarantool tarantool-devel cartridge-cli && \ +# a yum bug requires setting ulimit, see https://bugzilla.redhat.com/show_bug.cgi?id=1537564 +RUN ulimit -n 1024 &&\ + yum -y install cmake make gcc gcc-c++ git unzip cartridge-cli && \ yum clean all RUN groupadd -g $TARANTOOL_SERVER_GID $TARANTOOL_SERVER_GROUP && \ useradd -u $TARANTOOL_SERVER_UID -g $TARANTOOL_SERVER_GID -m -s /bin/bash $TARANTOOL_SERVER_USER \ @@ -21,8 +15,20 @@ RUN groupadd -g $TARANTOOL_SERVER_GID $TARANTOOL_SERVER_GROUP && \ USER $TARANTOOL_SERVER_USER:$TARANTOOL_SERVER_GROUP RUN cartridge version -FROM tarantool-base AS cartridge-base +FROM cartridge-base AS cartridge-app +ARG TARANTOOL_WORKDIR="/app" +ARG TARANTOOL_RUNDIR="/tmp/run" +ARG TARANTOOL_DATADIR="/tmp/data" +ARG TARANTOOL_LOGDIR="/tmp/log" +ARG TARANTOOL_INSTANCES_FILE="./instances.yml" ARG TARANTOOL_CLUSTER_COOKIE="testapp-cluster-cookie" +ENV TARANTOOL_WORKDIR=$TARANTOOL_WORKDIR +ENV TARANTOOL_RUNDIR=$TARANTOOL_RUNDIR +ENV TARANTOOL_DATADIR=$TARANTOOL_DATADIR +ENV TARANTOOL_LOGDIR=$TARANTOOL_LOGDIR +ENV TARANTOOL_INSTANCES_FILE=$TARANTOOL_INSTANCES_FILE ENV TARANTOOL_CLUSTER_COOKIE=$TARANTOOL_CLUSTER_COOKIE WORKDIR $TARANTOOL_WORKDIR -CMD cartridge build && cartridge start --run-dir=$TARANTOOL_RUNDIR --data-dir=$TARANTOOL_DATADIR --cfg=$TARANTOOL_INSTANCES_FILE \ No newline at end of file +CMD cartridge build && \ + cartridge start --run-dir=$TARANTOOL_RUNDIR --data-dir=$TARANTOOL_DATADIR \ + --log-dir=$TARANTOOL_LOGDIR --cfg=$TARANTOOL_INSTANCES_FILE \ No newline at end of file diff --git a/src/test/resources/cartridge/instances_2.11.12_2.8.yml b/src/test/resources/cartridge/instances_2.11.12_2.11.yml similarity index 100% rename from src/test/resources/cartridge/instances_2.11.12_2.8.yml rename to src/test/resources/cartridge/instances_2.11.12_2.11.yml diff --git a/src/test/resources/cartridge/instances_2.12.16_2.8.yml b/src/test/resources/cartridge/instances_2.12.16_2.11.yml similarity index 100% rename from src/test/resources/cartridge/instances_2.12.16_2.8.yml rename to src/test/resources/cartridge/instances_2.12.16_2.11.yml diff --git a/src/test/resources/cartridge/instances_2.13.10_2.8.yml b/src/test/resources/cartridge/instances_2.13.10_2.11.yml similarity index 100% rename from src/test/resources/cartridge/instances_2.13.10_2.8.yml rename to src/test/resources/cartridge/instances_2.13.10_2.11.yml diff --git a/src/test/resources/cartridge/topology_2.11.12_2.8.lua b/src/test/resources/cartridge/topology_2.11.12_2.11.lua similarity index 100% rename from src/test/resources/cartridge/topology_2.11.12_2.8.lua rename to src/test/resources/cartridge/topology_2.11.12_2.11.lua diff --git a/src/test/resources/cartridge/topology_2.12.16_2.8.lua b/src/test/resources/cartridge/topology_2.12.16_2.11.lua similarity index 100% rename from src/test/resources/cartridge/topology_2.12.16_2.8.lua rename to src/test/resources/cartridge/topology_2.12.16_2.11.lua diff --git a/src/test/resources/cartridge/topology_2.13.10_2.8.lua b/src/test/resources/cartridge/topology_2.13.10_2.11.lua similarity index 100% rename from src/test/resources/cartridge/topology_2.13.10_2.8.lua rename to src/test/resources/cartridge/topology_2.13.10_2.11.lua diff --git a/src/test/scala/io/tarantool/driver/api/metadata/TestSpaceMetadata.scala b/src/test/scala/io/tarantool/driver/api/metadata/TestSpaceMetadata.scala index ac50df9..5b539ff 100644 --- a/src/test/scala/io/tarantool/driver/api/metadata/TestSpaceMetadata.scala +++ b/src/test/scala/io/tarantool/driver/api/metadata/TestSpaceMetadata.scala @@ -91,7 +91,7 @@ object TestSpaceMetadata { "discount" -> TestFieldMetadata("discount", "number", 6), "favourite_constant" -> TestFieldMetadata("favourite_constant", "double", 7), "married" -> TestFieldMetadata("married", "boolean", 8), - "updated" -> TestFieldMetadata("updated", "unsigned", 9) + "updated" -> TestFieldMetadata("updated", "datetime", 9) ) def apply(): TarantoolSpaceMetadata = @@ -103,7 +103,7 @@ object TestSpaceWithArrayMetadata { private val spaceFieldMetadata = Map( "order_id" -> TestFieldMetadata("order_id", "string", 0), "order_items" -> TestFieldMetadata("order_items", "array", 1), - "updated" -> TestFieldMetadata("updated", "integer", 2) + "updated" -> TestFieldMetadata("updated", "datetime", 2) ) def apply(): TarantoolSpaceMetadata = @@ -115,7 +115,7 @@ object TestSpaceWithMapMetadata { private val spaceFieldMetadata = Map( "id" -> TestFieldMetadata("order_id", "string", 0), "settings" -> TestFieldMetadata("order_items", "map", 1), - "updated" -> TestFieldMetadata("updated", "integer", 2) + "updated" -> TestFieldMetadata("updated", "datetime", 2) ) def apply(): TarantoolSpaceMetadata = diff --git a/src/test/scala/io/tarantool/spark/connector/containers/TarantoolCartridgeContainer.scala b/src/test/scala/io/tarantool/spark/connector/containers/TarantoolCartridgeContainer.scala index 41ef93e..f09fda7 100644 --- a/src/test/scala/io/tarantool/spark/connector/containers/TarantoolCartridgeContainer.scala +++ b/src/test/scala/io/tarantool/spark/connector/containers/TarantoolCartridgeContainer.scala @@ -5,6 +5,7 @@ import org.slf4j.{Logger, LoggerFactory} import org.testcontainers.containers.output.Slf4jLogConsumer import org.testcontainers.containers.{TarantoolCartridgeContainer => JavaTarantoolCartridgeContainer} import org.testcontainers.containers.wait.strategy.Wait +import org.testcontainers.containers.Container import scala.collection.JavaConverters.mapAsJavaMapConverter @@ -51,8 +52,12 @@ case class TarantoolCartridgeContainer( def getRouterPort: Int = container.getRouterPort def getAPIPort: Int = container.getAPIPort - def executeScript(scriptResourcePath: String): CompletableFuture[util.List[_]] = - container.executeScript(scriptResourcePath) + def executeScript(scriptResourcePath: String): Unit = { + val result = container.executeScript(scriptResourcePath) + if (result.getExitCode != 0) { + throw new RuntimeException("Failed to execute script. Details:\n" + result.getStderr) + } + } } object TarantoolCartridgeContainer { diff --git a/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkReadClusterTest.scala b/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkReadClusterTest.scala index 1739968..1aa662a 100644 --- a/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkReadClusterTest.scala +++ b/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkReadClusterTest.scala @@ -28,11 +28,11 @@ class TarantoolSparkReadClusterTest extends AnyFunSuite with Matchers with Taran override protected def beforeEach(): Unit = { super.beforeEach() - SharedSparkContext.container.executeScript("test_setup.lua").get + SharedSparkContext.container.executeScript("test_setup.lua") } override protected def afterEach(): Unit = { - SharedSparkContext.container.executeScript("test_teardown.lua").get + SharedSparkContext.container.executeScript("test_teardown.lua") super.afterEach() } diff --git a/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkWriteClusterTest.scala b/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkWriteClusterTest.scala index 02c3ed5..f8b1176 100644 --- a/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkWriteClusterTest.scala +++ b/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkWriteClusterTest.scala @@ -129,7 +129,7 @@ class TarantoolSparkWriteClusterTest extends AnyFunSuite with Matchers with Tara thrownException.getMessage should include("already exists in Tarantool") // Clear the data and check that they are written in ErrorIfExists mode - SharedSparkContext.container.executeScript("test_teardown.lua").get() + SharedSparkContext.container.executeScript("test_teardown.lua") df = SharedSparkContext.spark.createDataFrame( SharedSparkContext.spark.sparkContext.parallelize( @@ -175,7 +175,7 @@ class TarantoolSparkWriteClusterTest extends AnyFunSuite with Matchers with Tara actual.foreach(item => item.getString("order_type") should endWith("444")) // Clear the data and check if they are written in Ignore mode - SharedSparkContext.container.executeScript("test_teardown.lua").get() + SharedSparkContext.container.executeScript("test_teardown.lua") df.write .format("org.apache.spark.sql.tarantool") diff --git a/src/test/scala/org/apache/spark/sql/tarantool/MapFunctionsSpec.scala b/src/test/scala/org/apache/spark/sql/tarantool/MapFunctionsSpec.scala index d142598..4979891 100644 --- a/src/test/scala/org/apache/spark/sql/tarantool/MapFunctionsSpec.scala +++ b/src/test/scala/org/apache/spark/sql/tarantool/MapFunctionsSpec.scala @@ -10,6 +10,7 @@ import org.scalatest.Inspectors.forAll import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.msgpack.value.Value import java.time.Instant import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, mapAsJavaMapConverter, seqAsJavaListConverter} @@ -35,7 +36,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { StructField("discount", FloatType, nullable = true), StructField("favourite_constant", DoubleType, nullable = true), StructField("married", BooleanType, nullable = true), - StructField("updated", LongType, nullable = true) + StructField("updated", TimestampType, nullable = true) ) ) @@ -43,7 +44,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { Array( StructField("order_id", StringType, nullable = true), StructField("order_items", DataTypes.createArrayType(IntegerType), nullable = true), - StructField("updated", LongType, nullable = true) + StructField("updated", TimestampType, nullable = true) ) ) @@ -51,7 +52,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { Array( StructField("id", StringType, nullable = true), StructField("settings", DataTypes.createMapType(StringType, StringType), nullable = true), - StructField("updated", LongType, nullable = true) + StructField("updated", TimestampType, nullable = true) ) ) @@ -65,22 +66,21 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a tuple with simple values of different types" in { - val time = Instant.now().getEpochSecond - val tuple = new TarantoolTupleImpl( + val tupleFactoryWithSchema = new DefaultTarantoolTupleFactory(defaultMapper, TestSpaceMetadata()) + val time = Instant.now() + val tuple = tupleFactoryWithSchema.create( Seq( - "Akakiy", - "Akakievich", - "Ivanov", - null, - 38, - new java.math.BigDecimal(200), - 0.5f, - Math.PI, - false, - time - ).asJava, - defaultMapper, - TestSpaceMetadata() + defaultMapper.toValue("Akakiy").asInstanceOf[Value], + defaultMapper.toValue("Akakievich").asInstanceOf[Value], + defaultMapper.toValue("Ivanov").asInstanceOf[Value], + defaultMapper.toValue(null).asInstanceOf[Value], + defaultMapper.toValue(38).asInstanceOf[Value], + defaultMapper.toValue(new java.math.BigDecimal(200)).asInstanceOf[Value], + defaultMapper.toValue(0.5f).asInstanceOf[Value], + defaultMapper.toValue(Math.PI).asInstanceOf[Value], + defaultMapper.toValue(false).asInstanceOf[Value], + defaultMapper.toValue(time).asInstanceOf[Value] + ).asJava ) val row = MapFunctions.tupleToRow(tuple, defaultMapper, simpleSchema) val expected = Row( @@ -103,15 +103,14 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a tuple with array values" in { - val time = Instant.now().getEpochSecond - val tuple = new TarantoolTupleImpl( + val tupleFactoryWithSchema = new DefaultTarantoolTupleFactory(defaultMapper, TestSpaceWithArrayMetadata()) + val time = Instant.now() + val tuple = tupleFactoryWithSchema.create( Seq( - null, - defaultMapper.toValue(List(1, 2, 3, 4).asJava), - time - ).asJava, - defaultMapper, - TestSpaceWithArrayMetadata() + defaultMapper.toValue(Some(null).orNull).asInstanceOf[Value], + defaultMapper.toValue(List(1, 2, 3, 4).asJava).asInstanceOf[Value], + defaultMapper.toValue(time).asInstanceOf[Value] + ).asJava ) val row = MapFunctions.tupleToRow(tuple, defaultMapper, simpleSchemaWithArray) val expected = Row( @@ -127,15 +126,14 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a tuple with map values" in { - val time = Instant.now().getEpochSecond - val tuple = new TarantoolTupleImpl( + val tupleFactoryWithSchema = new DefaultTarantoolTupleFactory(defaultMapper, TestSpaceWithMapMetadata()) + val time = Instant.now() + val tuple = tupleFactoryWithSchema.create( Seq( - null, - defaultMapper.toValue(Map("host" -> "127.0.0.1", "port" -> "3301").asJava), - time - ).asJava, - defaultMapper, - TestSpaceWithMapMetadata() + defaultMapper.toValue(Some(null).orNull).asInstanceOf[Value], + defaultMapper.toValue(Map("host" -> "127.0.0.1", "port" -> "3301").asJava).asInstanceOf[Value], + defaultMapper.toValue(time).asInstanceOf[Value] + ).asJava ) val row = MapFunctions.tupleToRow(tuple, defaultMapper, simpleSchemaWithMap) val expected = Row( @@ -159,7 +157,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a row with simple values of different types" in { - val time = Instant.now().getEpochSecond + val time = Instant.now() val row = Row( "Akakiy", "Akakievich", @@ -201,7 +199,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a row with array values" in { - val time = Instant.now().getEpochSecond + val time = Instant.now() val row = Row( null, List(1, 2, 3, 4).asJava, @@ -228,7 +226,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a row with map values" in { - val time = Instant.now().getEpochSecond + val time = Instant.now() val row = Row( null, Map("host" -> "127.0.0.1", "port" -> "3301").asJava, diff --git a/src/test/scala/org/apache/spark/sql/tarantool/TarantoolSchemaSpec.scala b/src/test/scala/org/apache/spark/sql/tarantool/TarantoolSchemaSpec.scala index 75df4b2..80901a1 100644 --- a/src/test/scala/org/apache/spark/sql/tarantool/TarantoolSchemaSpec.scala +++ b/src/test/scala/org/apache/spark/sql/tarantool/TarantoolSchemaSpec.scala @@ -28,7 +28,7 @@ class TarantoolSchemaSpec extends AnyFlatSpec { DataTypes.createStructField("discount", DoubleType, true), DataTypes.createStructField("favourite_constant", DoubleType, true), DataTypes.createStructField("married", BooleanType, true), - DataTypes.createStructField("updated", LongType, true) + DataTypes.createStructField("updated", TimestampType, true) ).toArray ) val actual = schema.asStructType("testSpace")