From d5f45bd0dd74d6e95d863d0ad2fa3a51267d1537 Mon Sep 17 00:00:00 2001 From: Alexey Kuzin Date: Mon, 31 Jul 2023 20:11:19 -0400 Subject: [PATCH] Add support for Timestamp type fields Update dependencies as well to the newest possible versions. The java.time.Instant converter was added only to the driver version 0.12.0 and works only with Tarantool versions 2.11.0+, so using the Timestamp values will lead to errors on previous Tarantool versions. The converter does not execute any checks for that for simplicity and to avoid extra performance impact. --- .github/workflows/scala.yml | 6 +- build.sbt | 20 +++--- .../spark/sql/tarantool/MapFunctions.scala | 22 +++--- .../spark/sql/tarantool/TarantoolSchema.scala | 1 + .../JavaSparkContextFunctionsTest.java | 4 +- src/test/resources/Dockerfile | 34 +++++---- ....12_2.8.yml => instances_2.11.12_2.11.yml} | 0 ....16_2.8.yml => instances_2.12.16_2.11.yml} | 0 ....10_2.8.yml => instances_2.13.10_2.11.yml} | 0 ...1.12_2.8.lua => topology_2.11.12_2.11.lua} | 0 ...2.16_2.8.lua => topology_2.12.16_2.11.lua} | 0 ...3.10_2.8.lua => topology_2.13.10_2.11.lua} | 0 .../api/metadata/TestSpaceMetadata.scala | 6 +- .../TarantoolCartridgeContainer.scala | 9 ++- .../TarantoolSparkReadClusterTest.scala | 4 +- .../TarantoolSparkWriteClusterTest.scala | 4 +- .../sql/tarantool/MapFunctionsSpec.scala | 72 +++++++++---------- .../sql/tarantool/TarantoolSchemaSpec.scala | 2 +- 18 files changed, 100 insertions(+), 84 deletions(-) rename src/test/resources/cartridge/{instances_2.11.12_2.8.yml => instances_2.11.12_2.11.yml} (100%) rename src/test/resources/cartridge/{instances_2.12.16_2.8.yml => instances_2.12.16_2.11.yml} (100%) rename src/test/resources/cartridge/{instances_2.13.10_2.8.yml => instances_2.13.10_2.11.yml} (100%) rename src/test/resources/cartridge/{topology_2.11.12_2.8.lua => topology_2.11.12_2.11.lua} (100%) rename src/test/resources/cartridge/{topology_2.12.16_2.8.lua => topology_2.12.16_2.11.lua} (100%) rename src/test/resources/cartridge/{topology_2.13.10_2.8.lua => topology_2.13.10_2.11.lua} (100%) diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index faff078..97d7df8 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -21,15 +21,15 @@ jobs: matrix: include: - scala: "2.11.12" - tarantool: "2.8" + tarantool: "2.11" router_port: "3301" router_api_port: "8081" - scala: "2.12.16" - tarantool: "2.8" + tarantool: "2.11" router_port: "3331" router_api_port: "8381" - scala: "2.13.10" - tarantool: "2.8" + tarantool: "2.11" router_port: "3361" router_api_port: "8681" steps: diff --git a/build.sbt b/build.sbt index 1fb4d7b..db23352 100644 --- a/build.sbt +++ b/build.sbt @@ -35,16 +35,17 @@ ThisBuild / developers := List( ThisBuild / scalaVersion := scala211 val commonDependencies = Seq( - "io.tarantool" % "cartridge-driver" % "0.10.1", - "junit" % "junit" % "4.12" % Test, - "com.github.sbt" % "junit-interface" % "0.12" % Test, - "org.scalatest" %% "scalatest" % "3.2.14" % Test, + "io.tarantool" % "cartridge-driver" % "0.12.0", + "junit" % "junit" % "4.13.2" % Test, + "com.github.sbt" % "junit-interface" % "0.13.3" % Test, + "org.scalatest" %% "scalatest" % "3.2.16" % Test, "org.scalamock" %% "scalamock" % "5.2.0" % Test, - "com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.12" % Test, - "ch.qos.logback" % "logback-core" % "1.2.5" % Test, - "ch.qos.logback" % "logback-classic" % "1.2.5" % Test, + "com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.17" % Test, + "ch.qos.logback" % "logback-core" % "1.2.12" % Test, + "ch.qos.logback" % "logback-classic" % "1.2.12" % Test, "org.apache.derby" % "derby" % "10.11.1.1" % Test, - "io.tarantool" % "testcontainers-java-tarantool" % "0.5.3" % Test + "io.tarantool" % "testcontainers-java-tarantool" % "1.0.0" % Test, + "org.msgpack" % "msgpack-core" % "0.9.0" % Test ).map( _.exclude("io.netty", "netty-all") .exclude("io.netty", "netty-transport") @@ -116,7 +117,8 @@ lazy val root = (project in file(".")) // Test frameworks options testOptions ++= Seq( Tests.Argument(TestFrameworks.JUnit, "-v"), - Tests.Setup(() => System.setSecurityManager(null)) // SPARK-22918 + Tests.Setup(() => System.setSecurityManager(null)), // SPARK-22918 + Tests.Argument("-oF") ), // Publishing settings publishTo := { diff --git a/src/main/scala/org/apache/spark/sql/tarantool/MapFunctions.scala b/src/main/scala/org/apache/spark/sql/tarantool/MapFunctions.scala index 29eef0b..75fb2a8 100644 --- a/src/main/scala/org/apache/spark/sql/tarantool/MapFunctions.scala +++ b/src/main/scala/org/apache/spark/sql/tarantool/MapFunctions.scala @@ -19,6 +19,8 @@ import java.lang.{ Short => JShort } import java.util.{ArrayList => JList, HashMap => JMap} +import java.sql.Timestamp +import java.time.Instant import scala.collection.JavaConverters.{mapAsJavaMapConverter, seqAsJavaListConverter} /** @@ -75,15 +77,16 @@ object MapFunctions { def dataTypeToJavaClass(dataType: DataType): Class[_] = dataType match { - case StringType => classOf[java.lang.String] - case LongType => classOf[java.lang.Long] - case IntegerType => classOf[java.lang.Integer] - case ShortType => classOf[java.lang.Integer] - case ByteType => classOf[java.lang.Integer] - case BooleanType => classOf[java.lang.Boolean] - case DoubleType => classOf[java.lang.Double] - case FloatType => classOf[java.lang.Float] - case _: DecimalType => classOf[java.math.BigDecimal] + case StringType => classOf[java.lang.String] + case LongType => classOf[java.lang.Long] + case IntegerType => classOf[java.lang.Integer] + case ShortType => classOf[java.lang.Integer] + case ByteType => classOf[java.lang.Integer] + case BooleanType => classOf[java.lang.Boolean] + case DoubleType => classOf[java.lang.Double] + case FloatType => classOf[java.lang.Float] + case _: DecimalType => classOf[java.math.BigDecimal] + case _: TimestampType => classOf[java.time.Instant] case mapType: MapType => val keyClass = dataTypeToJavaClass(mapType.keyType) val valueClass = dataTypeToJavaClass(mapType.valueType) @@ -170,6 +173,7 @@ object MapFunctions { case value: Long => value.asInstanceOf[JLong] case value: Float => value.asInstanceOf[JFloat] case value: Double => value.asInstanceOf[JDouble] + case value: Timestamp => value.toInstant().asInstanceOf[Instant] case value: Any => identity(value) } } diff --git a/src/main/scala/org/apache/spark/sql/tarantool/TarantoolSchema.scala b/src/main/scala/org/apache/spark/sql/tarantool/TarantoolSchema.scala index f336de1..cd86c6d 100644 --- a/src/main/scala/org/apache/spark/sql/tarantool/TarantoolSchema.scala +++ b/src/main/scala/org/apache/spark/sql/tarantool/TarantoolSchema.scala @@ -74,6 +74,7 @@ object TarantoolFieldTypes extends Enumeration { val BOOLEAN: TarantoolFieldType = TarantoolFieldType("boolean", DataTypes.BooleanType) val DECIMAL: TarantoolFieldType = TarantoolFieldType("decimal", createDecimalType()) val UUID: TarantoolFieldType = TarantoolFieldType("uuid", DataTypes.StringType) + val DATETIME: TarantoolFieldType = TarantoolFieldType("datetime", DataTypes.TimestampType) val ARRAY: TarantoolFieldType = TarantoolFieldType("array", DataTypes.createArrayType(DataTypes.StringType, true)) diff --git a/src/test/java/io/tarantool/spark/connector/integration/JavaSparkContextFunctionsTest.java b/src/test/java/io/tarantool/spark/connector/integration/JavaSparkContextFunctionsTest.java index 5b361d8..1264aea 100644 --- a/src/test/java/io/tarantool/spark/connector/integration/JavaSparkContextFunctionsTest.java +++ b/src/test/java/io/tarantool/spark/connector/integration/JavaSparkContextFunctionsTest.java @@ -23,7 +23,7 @@ public class JavaSparkContextFunctionsTest extends SharedJavaSparkContext { @Before public void beforeEach() { try { - container.executeScript("test_setup.lua").get(); + container.executeScript("test_setup.lua"); } catch (Exception e) { throw new RuntimeException("Failed to set up test: ", e); } @@ -32,7 +32,7 @@ public void beforeEach() { @After public void afterEach() { try { - container.executeScript("test_teardown.lua").get(); + container.executeScript("test_teardown.lua"); } catch (Exception e) { throw new RuntimeException("Failed to set up test: ", e); } diff --git a/src/test/resources/Dockerfile b/src/test/resources/Dockerfile index 9145d83..4ad4c5f 100644 --- a/src/test/resources/Dockerfile +++ b/src/test/resources/Dockerfile @@ -1,19 +1,13 @@ -FROM tgagor/centos:stream8 AS tarantool-base -ARG TARANTOOL_VERSION=2.8 +ARG TARANTOOL_VERSION=2.11.0 +FROM tarantool/tarantool:${TARANTOOL_VERSION}-centos7 AS cartridge-base + ARG TARANTOOL_SERVER_USER="tarantool" ARG TARANTOOL_SERVER_UID=1000 ARG TARANTOOL_SERVER_GROUP="tarantool" ARG TARANTOOL_SERVER_GID=1000 -ARG TARANTOOL_WORKDIR="/app" -ARG TARANTOOL_RUNDIR="/tmp/run" -ARG TARANTOOL_DATADIR="/tmp/data" -ARG TARANTOOL_INSTANCES_FILE="./instances.yml" -ENV TARANTOOL_WORKDIR=$TARANTOOL_WORKDIR -ENV TARANTOOL_RUNDIR=$TARANTOOL_RUNDIR -ENV TARANTOOL_DATADIR=$TARANTOOL_DATADIR -ENV TARANTOOL_INSTANCES_FILE=$TARANTOOL_INSTANCES_FILE -RUN curl -L https://tarantool.io/installer.sh | VER=$TARANTOOL_VERSION /bin/bash -s -- --repo-only && \ - yum -y install cmake make gcc gcc-c++ git unzip tarantool tarantool-devel cartridge-cli && \ +# a yum bug requires setting ulimit, see https://bugzilla.redhat.com/show_bug.cgi?id=1537564 +RUN ulimit -n 1024 &&\ + yum -y install cmake make gcc gcc-c++ git unzip cartridge-cli && \ yum clean all RUN groupadd -g $TARANTOOL_SERVER_GID $TARANTOOL_SERVER_GROUP && \ useradd -u $TARANTOOL_SERVER_UID -g $TARANTOOL_SERVER_GID -m -s /bin/bash $TARANTOOL_SERVER_USER \ @@ -21,8 +15,20 @@ RUN groupadd -g $TARANTOOL_SERVER_GID $TARANTOOL_SERVER_GROUP && \ USER $TARANTOOL_SERVER_USER:$TARANTOOL_SERVER_GROUP RUN cartridge version -FROM tarantool-base AS cartridge-base +FROM cartridge-base AS cartridge-app +ARG TARANTOOL_WORKDIR="/app" +ARG TARANTOOL_RUNDIR="/tmp/run" +ARG TARANTOOL_DATADIR="/tmp/data" +ARG TARANTOOL_LOGDIR="/tmp/log" +ARG TARANTOOL_INSTANCES_FILE="./instances.yml" ARG TARANTOOL_CLUSTER_COOKIE="testapp-cluster-cookie" +ENV TARANTOOL_WORKDIR=$TARANTOOL_WORKDIR +ENV TARANTOOL_RUNDIR=$TARANTOOL_RUNDIR +ENV TARANTOOL_DATADIR=$TARANTOOL_DATADIR +ENV TARANTOOL_LOGDIR=$TARANTOOL_LOGDIR +ENV TARANTOOL_INSTANCES_FILE=$TARANTOOL_INSTANCES_FILE ENV TARANTOOL_CLUSTER_COOKIE=$TARANTOOL_CLUSTER_COOKIE WORKDIR $TARANTOOL_WORKDIR -CMD cartridge build && cartridge start --run-dir=$TARANTOOL_RUNDIR --data-dir=$TARANTOOL_DATADIR --cfg=$TARANTOOL_INSTANCES_FILE \ No newline at end of file +CMD cartridge build && \ + cartridge start --run-dir=$TARANTOOL_RUNDIR --data-dir=$TARANTOOL_DATADIR \ + --log-dir=$TARANTOOL_LOGDIR --cfg=$TARANTOOL_INSTANCES_FILE \ No newline at end of file diff --git a/src/test/resources/cartridge/instances_2.11.12_2.8.yml b/src/test/resources/cartridge/instances_2.11.12_2.11.yml similarity index 100% rename from src/test/resources/cartridge/instances_2.11.12_2.8.yml rename to src/test/resources/cartridge/instances_2.11.12_2.11.yml diff --git a/src/test/resources/cartridge/instances_2.12.16_2.8.yml b/src/test/resources/cartridge/instances_2.12.16_2.11.yml similarity index 100% rename from src/test/resources/cartridge/instances_2.12.16_2.8.yml rename to src/test/resources/cartridge/instances_2.12.16_2.11.yml diff --git a/src/test/resources/cartridge/instances_2.13.10_2.8.yml b/src/test/resources/cartridge/instances_2.13.10_2.11.yml similarity index 100% rename from src/test/resources/cartridge/instances_2.13.10_2.8.yml rename to src/test/resources/cartridge/instances_2.13.10_2.11.yml diff --git a/src/test/resources/cartridge/topology_2.11.12_2.8.lua b/src/test/resources/cartridge/topology_2.11.12_2.11.lua similarity index 100% rename from src/test/resources/cartridge/topology_2.11.12_2.8.lua rename to src/test/resources/cartridge/topology_2.11.12_2.11.lua diff --git a/src/test/resources/cartridge/topology_2.12.16_2.8.lua b/src/test/resources/cartridge/topology_2.12.16_2.11.lua similarity index 100% rename from src/test/resources/cartridge/topology_2.12.16_2.8.lua rename to src/test/resources/cartridge/topology_2.12.16_2.11.lua diff --git a/src/test/resources/cartridge/topology_2.13.10_2.8.lua b/src/test/resources/cartridge/topology_2.13.10_2.11.lua similarity index 100% rename from src/test/resources/cartridge/topology_2.13.10_2.8.lua rename to src/test/resources/cartridge/topology_2.13.10_2.11.lua diff --git a/src/test/scala/io/tarantool/driver/api/metadata/TestSpaceMetadata.scala b/src/test/scala/io/tarantool/driver/api/metadata/TestSpaceMetadata.scala index ac50df9..5b539ff 100644 --- a/src/test/scala/io/tarantool/driver/api/metadata/TestSpaceMetadata.scala +++ b/src/test/scala/io/tarantool/driver/api/metadata/TestSpaceMetadata.scala @@ -91,7 +91,7 @@ object TestSpaceMetadata { "discount" -> TestFieldMetadata("discount", "number", 6), "favourite_constant" -> TestFieldMetadata("favourite_constant", "double", 7), "married" -> TestFieldMetadata("married", "boolean", 8), - "updated" -> TestFieldMetadata("updated", "unsigned", 9) + "updated" -> TestFieldMetadata("updated", "datetime", 9) ) def apply(): TarantoolSpaceMetadata = @@ -103,7 +103,7 @@ object TestSpaceWithArrayMetadata { private val spaceFieldMetadata = Map( "order_id" -> TestFieldMetadata("order_id", "string", 0), "order_items" -> TestFieldMetadata("order_items", "array", 1), - "updated" -> TestFieldMetadata("updated", "integer", 2) + "updated" -> TestFieldMetadata("updated", "datetime", 2) ) def apply(): TarantoolSpaceMetadata = @@ -115,7 +115,7 @@ object TestSpaceWithMapMetadata { private val spaceFieldMetadata = Map( "id" -> TestFieldMetadata("order_id", "string", 0), "settings" -> TestFieldMetadata("order_items", "map", 1), - "updated" -> TestFieldMetadata("updated", "integer", 2) + "updated" -> TestFieldMetadata("updated", "datetime", 2) ) def apply(): TarantoolSpaceMetadata = diff --git a/src/test/scala/io/tarantool/spark/connector/containers/TarantoolCartridgeContainer.scala b/src/test/scala/io/tarantool/spark/connector/containers/TarantoolCartridgeContainer.scala index 41ef93e..f09fda7 100644 --- a/src/test/scala/io/tarantool/spark/connector/containers/TarantoolCartridgeContainer.scala +++ b/src/test/scala/io/tarantool/spark/connector/containers/TarantoolCartridgeContainer.scala @@ -5,6 +5,7 @@ import org.slf4j.{Logger, LoggerFactory} import org.testcontainers.containers.output.Slf4jLogConsumer import org.testcontainers.containers.{TarantoolCartridgeContainer => JavaTarantoolCartridgeContainer} import org.testcontainers.containers.wait.strategy.Wait +import org.testcontainers.containers.Container import scala.collection.JavaConverters.mapAsJavaMapConverter @@ -51,8 +52,12 @@ case class TarantoolCartridgeContainer( def getRouterPort: Int = container.getRouterPort def getAPIPort: Int = container.getAPIPort - def executeScript(scriptResourcePath: String): CompletableFuture[util.List[_]] = - container.executeScript(scriptResourcePath) + def executeScript(scriptResourcePath: String): Unit = { + val result = container.executeScript(scriptResourcePath) + if (result.getExitCode != 0) { + throw new RuntimeException("Failed to execute script. Details:\n" + result.getStderr) + } + } } object TarantoolCartridgeContainer { diff --git a/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkReadClusterTest.scala b/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkReadClusterTest.scala index 1739968..1aa662a 100644 --- a/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkReadClusterTest.scala +++ b/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkReadClusterTest.scala @@ -28,11 +28,11 @@ class TarantoolSparkReadClusterTest extends AnyFunSuite with Matchers with Taran override protected def beforeEach(): Unit = { super.beforeEach() - SharedSparkContext.container.executeScript("test_setup.lua").get + SharedSparkContext.container.executeScript("test_setup.lua") } override protected def afterEach(): Unit = { - SharedSparkContext.container.executeScript("test_teardown.lua").get + SharedSparkContext.container.executeScript("test_teardown.lua") super.afterEach() } diff --git a/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkWriteClusterTest.scala b/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkWriteClusterTest.scala index 02c3ed5..f8b1176 100644 --- a/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkWriteClusterTest.scala +++ b/src/test/scala/io/tarantool/spark/connector/integration/TarantoolSparkWriteClusterTest.scala @@ -129,7 +129,7 @@ class TarantoolSparkWriteClusterTest extends AnyFunSuite with Matchers with Tara thrownException.getMessage should include("already exists in Tarantool") // Clear the data and check that they are written in ErrorIfExists mode - SharedSparkContext.container.executeScript("test_teardown.lua").get() + SharedSparkContext.container.executeScript("test_teardown.lua") df = SharedSparkContext.spark.createDataFrame( SharedSparkContext.spark.sparkContext.parallelize( @@ -175,7 +175,7 @@ class TarantoolSparkWriteClusterTest extends AnyFunSuite with Matchers with Tara actual.foreach(item => item.getString("order_type") should endWith("444")) // Clear the data and check if they are written in Ignore mode - SharedSparkContext.container.executeScript("test_teardown.lua").get() + SharedSparkContext.container.executeScript("test_teardown.lua") df.write .format("org.apache.spark.sql.tarantool") diff --git a/src/test/scala/org/apache/spark/sql/tarantool/MapFunctionsSpec.scala b/src/test/scala/org/apache/spark/sql/tarantool/MapFunctionsSpec.scala index d142598..4979891 100644 --- a/src/test/scala/org/apache/spark/sql/tarantool/MapFunctionsSpec.scala +++ b/src/test/scala/org/apache/spark/sql/tarantool/MapFunctionsSpec.scala @@ -10,6 +10,7 @@ import org.scalatest.Inspectors.forAll import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.msgpack.value.Value import java.time.Instant import scala.collection.JavaConverters.{iterableAsScalaIterableConverter, mapAsJavaMapConverter, seqAsJavaListConverter} @@ -35,7 +36,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { StructField("discount", FloatType, nullable = true), StructField("favourite_constant", DoubleType, nullable = true), StructField("married", BooleanType, nullable = true), - StructField("updated", LongType, nullable = true) + StructField("updated", TimestampType, nullable = true) ) ) @@ -43,7 +44,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { Array( StructField("order_id", StringType, nullable = true), StructField("order_items", DataTypes.createArrayType(IntegerType), nullable = true), - StructField("updated", LongType, nullable = true) + StructField("updated", TimestampType, nullable = true) ) ) @@ -51,7 +52,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { Array( StructField("id", StringType, nullable = true), StructField("settings", DataTypes.createMapType(StringType, StringType), nullable = true), - StructField("updated", LongType, nullable = true) + StructField("updated", TimestampType, nullable = true) ) ) @@ -65,22 +66,21 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a tuple with simple values of different types" in { - val time = Instant.now().getEpochSecond - val tuple = new TarantoolTupleImpl( + val tupleFactoryWithSchema = new DefaultTarantoolTupleFactory(defaultMapper, TestSpaceMetadata()) + val time = Instant.now() + val tuple = tupleFactoryWithSchema.create( Seq( - "Akakiy", - "Akakievich", - "Ivanov", - null, - 38, - new java.math.BigDecimal(200), - 0.5f, - Math.PI, - false, - time - ).asJava, - defaultMapper, - TestSpaceMetadata() + defaultMapper.toValue("Akakiy").asInstanceOf[Value], + defaultMapper.toValue("Akakievich").asInstanceOf[Value], + defaultMapper.toValue("Ivanov").asInstanceOf[Value], + defaultMapper.toValue(null).asInstanceOf[Value], + defaultMapper.toValue(38).asInstanceOf[Value], + defaultMapper.toValue(new java.math.BigDecimal(200)).asInstanceOf[Value], + defaultMapper.toValue(0.5f).asInstanceOf[Value], + defaultMapper.toValue(Math.PI).asInstanceOf[Value], + defaultMapper.toValue(false).asInstanceOf[Value], + defaultMapper.toValue(time).asInstanceOf[Value] + ).asJava ) val row = MapFunctions.tupleToRow(tuple, defaultMapper, simpleSchema) val expected = Row( @@ -103,15 +103,14 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a tuple with array values" in { - val time = Instant.now().getEpochSecond - val tuple = new TarantoolTupleImpl( + val tupleFactoryWithSchema = new DefaultTarantoolTupleFactory(defaultMapper, TestSpaceWithArrayMetadata()) + val time = Instant.now() + val tuple = tupleFactoryWithSchema.create( Seq( - null, - defaultMapper.toValue(List(1, 2, 3, 4).asJava), - time - ).asJava, - defaultMapper, - TestSpaceWithArrayMetadata() + defaultMapper.toValue(Some(null).orNull).asInstanceOf[Value], + defaultMapper.toValue(List(1, 2, 3, 4).asJava).asInstanceOf[Value], + defaultMapper.toValue(time).asInstanceOf[Value] + ).asJava ) val row = MapFunctions.tupleToRow(tuple, defaultMapper, simpleSchemaWithArray) val expected = Row( @@ -127,15 +126,14 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a tuple with map values" in { - val time = Instant.now().getEpochSecond - val tuple = new TarantoolTupleImpl( + val tupleFactoryWithSchema = new DefaultTarantoolTupleFactory(defaultMapper, TestSpaceWithMapMetadata()) + val time = Instant.now() + val tuple = tupleFactoryWithSchema.create( Seq( - null, - defaultMapper.toValue(Map("host" -> "127.0.0.1", "port" -> "3301").asJava), - time - ).asJava, - defaultMapper, - TestSpaceWithMapMetadata() + defaultMapper.toValue(Some(null).orNull).asInstanceOf[Value], + defaultMapper.toValue(Map("host" -> "127.0.0.1", "port" -> "3301").asJava).asInstanceOf[Value], + defaultMapper.toValue(time).asInstanceOf[Value] + ).asJava ) val row = MapFunctions.tupleToRow(tuple, defaultMapper, simpleSchemaWithMap) val expected = Row( @@ -159,7 +157,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a row with simple values of different types" in { - val time = Instant.now().getEpochSecond + val time = Instant.now() val row = Row( "Akakiy", "Akakievich", @@ -201,7 +199,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a row with array values" in { - val time = Instant.now().getEpochSecond + val time = Instant.now() val row = Row( null, List(1, 2, 3, 4).asJava, @@ -228,7 +226,7 @@ class MapFunctionsSpec extends AnyFlatSpec with Matchers { } it should "convert a row with map values" in { - val time = Instant.now().getEpochSecond + val time = Instant.now() val row = Row( null, Map("host" -> "127.0.0.1", "port" -> "3301").asJava, diff --git a/src/test/scala/org/apache/spark/sql/tarantool/TarantoolSchemaSpec.scala b/src/test/scala/org/apache/spark/sql/tarantool/TarantoolSchemaSpec.scala index 75df4b2..80901a1 100644 --- a/src/test/scala/org/apache/spark/sql/tarantool/TarantoolSchemaSpec.scala +++ b/src/test/scala/org/apache/spark/sql/tarantool/TarantoolSchemaSpec.scala @@ -28,7 +28,7 @@ class TarantoolSchemaSpec extends AnyFlatSpec { DataTypes.createStructField("discount", DoubleType, true), DataTypes.createStructField("favourite_constant", DoubleType, true), DataTypes.createStructField("married", BooleanType, true), - DataTypes.createStructField("updated", LongType, true) + DataTypes.createStructField("updated", TimestampType, true) ).toArray ) val actual = schema.asStructType("testSpace")