Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Timestamp type fields #53

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ jobs:
matrix:
include:
- scala: "2.11.12"
tarantool: "2.8"
tarantool: "2.11"
router_port: "3301"
router_api_port: "8081"
- scala: "2.12.16"
tarantool: "2.8"
tarantool: "2.11"
router_port: "3331"
router_api_port: "8381"
- scala: "2.13.10"
tarantool: "2.8"
tarantool: "2.11"
router_port: "3361"
router_api_port: "8681"
steps:
Expand Down
20 changes: 11 additions & 9 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,17 @@ ThisBuild / developers := List(
ThisBuild / scalaVersion := scala211

val commonDependencies = Seq(
"io.tarantool" % "cartridge-driver" % "0.10.1",
"junit" % "junit" % "4.12" % Test,
"com.github.sbt" % "junit-interface" % "0.12" % Test,
"org.scalatest" %% "scalatest" % "3.2.14" % Test,
"io.tarantool" % "cartridge-driver" % "0.12.0",
"junit" % "junit" % "4.13.2" % Test,
"com.github.sbt" % "junit-interface" % "0.13.3" % Test,
"org.scalatest" %% "scalatest" % "3.2.16" % Test,
"org.scalamock" %% "scalamock" % "5.2.0" % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.12" % Test,
"ch.qos.logback" % "logback-core" % "1.2.5" % Test,
"ch.qos.logback" % "logback-classic" % "1.2.5" % Test,
"com.dimafeng" %% "testcontainers-scala-scalatest" % "0.40.17" % Test,
"ch.qos.logback" % "logback-core" % "1.2.12" % Test,
"ch.qos.logback" % "logback-classic" % "1.2.12" % Test,
"org.apache.derby" % "derby" % "10.11.1.1" % Test,
"io.tarantool" % "testcontainers-java-tarantool" % "0.5.3" % Test
"io.tarantool" % "testcontainers-java-tarantool" % "1.0.0" % Test,
"org.msgpack" % "msgpack-core" % "0.9.0" % Test
).map(
_.exclude("io.netty", "netty-all")
.exclude("io.netty", "netty-transport")
Expand Down Expand Up @@ -116,7 +117,8 @@ lazy val root = (project in file("."))
// Test frameworks options
testOptions ++= Seq(
Tests.Argument(TestFrameworks.JUnit, "-v"),
Tests.Setup(() => System.setSecurityManager(null)) // SPARK-22918
Tests.Setup(() => System.setSecurityManager(null)), // SPARK-22918
Tests.Argument("-oF")
),
// Publishing settings
publishTo := {
Expand Down
22 changes: 13 additions & 9 deletions src/main/scala/org/apache/spark/sql/tarantool/MapFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import java.lang.{
Short => JShort
}
import java.util.{ArrayList => JList, HashMap => JMap}
import java.sql.Timestamp
import java.time.Instant
import scala.collection.JavaConverters.{mapAsJavaMapConverter, seqAsJavaListConverter}

/**
Expand Down Expand Up @@ -75,15 +77,16 @@ object MapFunctions {

def dataTypeToJavaClass(dataType: DataType): Class[_] =
dataType match {
case StringType => classOf[java.lang.String]
case LongType => classOf[java.lang.Long]
case IntegerType => classOf[java.lang.Integer]
case ShortType => classOf[java.lang.Integer]
case ByteType => classOf[java.lang.Integer]
case BooleanType => classOf[java.lang.Boolean]
case DoubleType => classOf[java.lang.Double]
case FloatType => classOf[java.lang.Float]
case _: DecimalType => classOf[java.math.BigDecimal]
case StringType => classOf[java.lang.String]
case LongType => classOf[java.lang.Long]
case IntegerType => classOf[java.lang.Integer]
case ShortType => classOf[java.lang.Integer]
case ByteType => classOf[java.lang.Integer]
case BooleanType => classOf[java.lang.Boolean]
case DoubleType => classOf[java.lang.Double]
case FloatType => classOf[java.lang.Float]
case _: DecimalType => classOf[java.math.BigDecimal]
case _: TimestampType => classOf[java.time.Instant]
case mapType: MapType =>
val keyClass = dataTypeToJavaClass(mapType.keyType)
val valueClass = dataTypeToJavaClass(mapType.valueType)
Expand Down Expand Up @@ -170,6 +173,7 @@ object MapFunctions {
case value: Long => value.asInstanceOf[JLong]
case value: Float => value.asInstanceOf[JFloat]
case value: Double => value.asInstanceOf[JDouble]
case value: Timestamp => value.toInstant().asInstanceOf[Instant]
case value: Any => identity(value)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ object TarantoolFieldTypes extends Enumeration {
val BOOLEAN: TarantoolFieldType = TarantoolFieldType("boolean", DataTypes.BooleanType)
val DECIMAL: TarantoolFieldType = TarantoolFieldType("decimal", createDecimalType())
val UUID: TarantoolFieldType = TarantoolFieldType("uuid", DataTypes.StringType)
val DATETIME: TarantoolFieldType = TarantoolFieldType("datetime", DataTypes.TimestampType)

val ARRAY: TarantoolFieldType =
TarantoolFieldType("array", DataTypes.createArrayType(DataTypes.StringType, true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class JavaSparkContextFunctionsTest extends SharedJavaSparkContext {
@Before
public void beforeEach() {
try {
container.executeScript("test_setup.lua").get();
container.executeScript("test_setup.lua");
} catch (Exception e) {
throw new RuntimeException("Failed to set up test: ", e);
}
Expand All @@ -32,7 +32,7 @@ public void beforeEach() {
@After
public void afterEach() {
try {
container.executeScript("test_teardown.lua").get();
container.executeScript("test_teardown.lua");
} catch (Exception e) {
throw new RuntimeException("Failed to set up test: ", e);
}
Expand Down
34 changes: 20 additions & 14 deletions src/test/resources/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
FROM tgagor/centos:stream8 AS tarantool-base
ARG TARANTOOL_VERSION=2.8
ARG TARANTOOL_VERSION=2.11.0
FROM tarantool/tarantool:${TARANTOOL_VERSION}-centos7 AS cartridge-base

ARG TARANTOOL_SERVER_USER="tarantool"
ARG TARANTOOL_SERVER_UID=1000
ARG TARANTOOL_SERVER_GROUP="tarantool"
ARG TARANTOOL_SERVER_GID=1000
ARG TARANTOOL_WORKDIR="/app"
ARG TARANTOOL_RUNDIR="/tmp/run"
ARG TARANTOOL_DATADIR="/tmp/data"
ARG TARANTOOL_INSTANCES_FILE="./instances.yml"
ENV TARANTOOL_WORKDIR=$TARANTOOL_WORKDIR
ENV TARANTOOL_RUNDIR=$TARANTOOL_RUNDIR
ENV TARANTOOL_DATADIR=$TARANTOOL_DATADIR
ENV TARANTOOL_INSTANCES_FILE=$TARANTOOL_INSTANCES_FILE
RUN curl -L https://tarantool.io/installer.sh | VER=$TARANTOOL_VERSION /bin/bash -s -- --repo-only && \
yum -y install cmake make gcc gcc-c++ git unzip tarantool tarantool-devel cartridge-cli && \
# a yum bug requires setting ulimit, see https://bugzilla.redhat.com/show_bug.cgi?id=1537564
RUN ulimit -n 1024 &&\
yum -y install cmake make gcc gcc-c++ git unzip cartridge-cli && \
yum clean all
RUN groupadd -g $TARANTOOL_SERVER_GID $TARANTOOL_SERVER_GROUP && \
useradd -u $TARANTOOL_SERVER_UID -g $TARANTOOL_SERVER_GID -m -s /bin/bash $TARANTOOL_SERVER_USER \
|| true
USER $TARANTOOL_SERVER_USER:$TARANTOOL_SERVER_GROUP
RUN cartridge version

FROM tarantool-base AS cartridge-base
FROM cartridge-base AS cartridge-app
ARG TARANTOOL_WORKDIR="/app"
ARG TARANTOOL_RUNDIR="/tmp/run"
ARG TARANTOOL_DATADIR="/tmp/data"
ARG TARANTOOL_LOGDIR="/tmp/log"
ARG TARANTOOL_INSTANCES_FILE="./instances.yml"
ARG TARANTOOL_CLUSTER_COOKIE="testapp-cluster-cookie"
ENV TARANTOOL_WORKDIR=$TARANTOOL_WORKDIR
ENV TARANTOOL_RUNDIR=$TARANTOOL_RUNDIR
ENV TARANTOOL_DATADIR=$TARANTOOL_DATADIR
ENV TARANTOOL_LOGDIR=$TARANTOOL_LOGDIR
ENV TARANTOOL_INSTANCES_FILE=$TARANTOOL_INSTANCES_FILE
ENV TARANTOOL_CLUSTER_COOKIE=$TARANTOOL_CLUSTER_COOKIE
WORKDIR $TARANTOOL_WORKDIR
CMD cartridge build && cartridge start --run-dir=$TARANTOOL_RUNDIR --data-dir=$TARANTOOL_DATADIR --cfg=$TARANTOOL_INSTANCES_FILE
CMD cartridge build && \
cartridge start --run-dir=$TARANTOOL_RUNDIR --data-dir=$TARANTOOL_DATADIR \
--log-dir=$TARANTOOL_LOGDIR --cfg=$TARANTOOL_INSTANCES_FILE
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ object TestSpaceMetadata {
"discount" -> TestFieldMetadata("discount", "number", 6),
"favourite_constant" -> TestFieldMetadata("favourite_constant", "double", 7),
"married" -> TestFieldMetadata("married", "boolean", 8),
"updated" -> TestFieldMetadata("updated", "unsigned", 9)
"updated" -> TestFieldMetadata("updated", "datetime", 9)
)

def apply(): TarantoolSpaceMetadata =
Expand All @@ -103,7 +103,7 @@ object TestSpaceWithArrayMetadata {
private val spaceFieldMetadata = Map(
"order_id" -> TestFieldMetadata("order_id", "string", 0),
"order_items" -> TestFieldMetadata("order_items", "array", 1),
"updated" -> TestFieldMetadata("updated", "integer", 2)
"updated" -> TestFieldMetadata("updated", "datetime", 2)
)

def apply(): TarantoolSpaceMetadata =
Expand All @@ -115,7 +115,7 @@ object TestSpaceWithMapMetadata {
private val spaceFieldMetadata = Map(
"id" -> TestFieldMetadata("order_id", "string", 0),
"settings" -> TestFieldMetadata("order_items", "map", 1),
"updated" -> TestFieldMetadata("updated", "integer", 2)
"updated" -> TestFieldMetadata("updated", "datetime", 2)
)

def apply(): TarantoolSpaceMetadata =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.slf4j.{Logger, LoggerFactory}
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.containers.{TarantoolCartridgeContainer => JavaTarantoolCartridgeContainer}
import org.testcontainers.containers.wait.strategy.Wait
import org.testcontainers.containers.Container

import scala.collection.JavaConverters.mapAsJavaMapConverter

Expand Down Expand Up @@ -51,8 +52,12 @@ case class TarantoolCartridgeContainer(
def getRouterPort: Int = container.getRouterPort
def getAPIPort: Int = container.getAPIPort

def executeScript(scriptResourcePath: String): CompletableFuture[util.List[_]] =
container.executeScript(scriptResourcePath)
def executeScript(scriptResourcePath: String): Unit = {
val result = container.executeScript(scriptResourcePath)
if (result.getExitCode != 0) {
throw new RuntimeException("Failed to execute script. Details:\n" + result.getStderr)
}
}
}

object TarantoolCartridgeContainer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class TarantoolSparkReadClusterTest extends AnyFunSuite with Matchers with Taran

override protected def beforeEach(): Unit = {
super.beforeEach()
SharedSparkContext.container.executeScript("test_setup.lua").get
SharedSparkContext.container.executeScript("test_setup.lua")
}

override protected def afterEach(): Unit = {
SharedSparkContext.container.executeScript("test_teardown.lua").get
SharedSparkContext.container.executeScript("test_teardown.lua")
super.afterEach()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class TarantoolSparkWriteClusterTest extends AnyFunSuite with Matchers with Tara
thrownException.getMessage should include("already exists in Tarantool")

// Clear the data and check that they are written in ErrorIfExists mode
SharedSparkContext.container.executeScript("test_teardown.lua").get()
SharedSparkContext.container.executeScript("test_teardown.lua")

df = SharedSparkContext.spark.createDataFrame(
SharedSparkContext.spark.sparkContext.parallelize(
Expand Down Expand Up @@ -175,7 +175,7 @@ class TarantoolSparkWriteClusterTest extends AnyFunSuite with Matchers with Tara
actual.foreach(item => item.getString("order_type") should endWith("444"))

// Clear the data and check if they are written in Ignore mode
SharedSparkContext.container.executeScript("test_teardown.lua").get()
SharedSparkContext.container.executeScript("test_teardown.lua")

df.write
.format("org.apache.spark.sql.tarantool")
Expand Down
Loading
Loading