Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow passing the connections option #34

Merged
merged 2 commits into from
Aug 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ libraryDependencies += "io.tarantool" %% "spark-tarantool-connector" % "0.4.0"
| tarantool.connectTimeout | server connect timeout, in milliseconds | 1000 |
| tarantool.readTimeout | socket read timeout, in milliseconds | 1000 |
| tarantool.requestTimeout | request completion timeout, in milliseconds | 2000 |
| tarantool.connections | number of connections established with each host | 1 |
| tarantool.cursorBatchSize | default limit for prefetching tuples in RDD iterator | 1000 |

### Dataset API request options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ case class Timeouts(connect: Option[Int], read: Option[Int], request: Option[Int
case class TarantoolConfig(
hosts: Seq[TarantoolServerAddress],
credentials: Option[Credentials],
timeouts: Timeouts
timeouts: Timeouts,
connections: Option[Int]
) extends Serializable

object TarantoolConfig {
Expand All @@ -27,6 +28,7 @@ object TarantoolConfig {
private val REQUEST_TIMEOUT = PREFIX + "requestTimeout"

private val HOSTS = PREFIX + "hosts"
private val CONNECTIONS = PREFIX + "connections"

//options with spark. prefix
private val SPARK_USERNAME = SPARK_PREFIX + USERNAME
Expand All @@ -37,9 +39,15 @@ object TarantoolConfig {
private val SPARK_REQUEST_TIMEOUT = SPARK_PREFIX + REQUEST_TIMEOUT

private val SPARK_HOSTS = SPARK_PREFIX + HOSTS
private val SPARK_CONNECTIONS = SPARK_PREFIX + CONNECTIONS

def apply(cfg: SparkConf): TarantoolConfig =
TarantoolConfig(parseHosts(cfg), parseCredentials(cfg), parseTimeouts(cfg))
TarantoolConfig(
parseHosts(cfg),
parseCredentials(cfg),
parseTimeouts(cfg),
parseIntOption(cfg, CONNECTIONS, SPARK_CONNECTIONS)
)

def parseCredentials(cfg: SparkConf): Option[Credentials] = {
val username = cfg.getOption(USERNAME).orElse(cfg.getOption(SPARK_USERNAME))
Expand Down Expand Up @@ -70,11 +78,11 @@ object TarantoolConfig {

def parseTimeouts(cfg: SparkConf): Timeouts =
Timeouts(
parseTimeout(cfg, CONNECT_TIMEOUT, SPARK_CONNECT_TIMEOUT),
parseTimeout(cfg, READ_TIMEOUT, SPARK_READ_TIMEOUT),
parseTimeout(cfg, REQUEST_TIMEOUT, SPARK_REQUEST_TIMEOUT)
parseIntOption(cfg, CONNECT_TIMEOUT, SPARK_CONNECT_TIMEOUT),
parseIntOption(cfg, READ_TIMEOUT, SPARK_READ_TIMEOUT),
parseIntOption(cfg, REQUEST_TIMEOUT, SPARK_REQUEST_TIMEOUT)
)

def parseTimeout(cfg: SparkConf, name: String, nameWithSparkPrefix: String): Option[Int] =
def parseIntOption(cfg: SparkConf, name: String, nameWithSparkPrefix: String): Option[Int] =
cfg.getOption(name).orElse(cfg.getOption(nameWithSparkPrefix)).map(_.toInt)
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ class TarantoolConnection[T <: Packable, R <: util.Collection[T]](
builder.withRequestTimeout(cnf.timeouts.request.get)
}

if (cnf.connections.isDefined) {
builder.withConnections(cnf.connections.get)
}

builder
}

Expand Down
18 changes: 9 additions & 9 deletions src/test/resources/cartridge/instances.yml
Original file line number Diff line number Diff line change
@@ -1,34 +1,34 @@
testapp.router:
workdir: ./tmp/db_dev/3301
advertise_uri: localhost:3301
workdir: /tmp/db_dev/3301
advertise_uri: 0.0.0.0:3301
http_port: 8081

testapp.second-router:
workdir: ./tmp/db_dev/3311
advertise_uri: localhost:3311
workdir: /tmp/db_dev/3311
advertise_uri: 0.0.0.0:3311
http_port: 8091

testapp.s1-master:
workdir: ./tmp/db_dev/3302
workdir: /tmp/db_dev/3302
advertise_uri: localhost:3302
http_port: 8082

testapp.s1-replica:
workdir: ./tmp/db_dev/3303
workdir: /tmp/db_dev/3303
advertise_uri: localhost:3303
http_port: 8083

testapp.s2-master:
workdir: ./tmp/db_dev/3304
workdir: /tmp/db_dev/3304
advertise_uri: localhost:3304
http_port: 8084

testapp.s2-replica:
workdir: ./tmp/db_dev/3305
workdir: /tmp/db_dev/3305
advertise_uri: localhost:3305
http_port: 8085

testapp-stateboard:
workdir: ./tmp/db_dev/3310
workdir: /tmp/db_dev/3310
listen: localhost:3310
password: passwd
4 changes: 2 additions & 2 deletions src/test/resources/cartridge/topology.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ cartridge = require('cartridge')
replicasets = {{
alias = 'app-router',
roles = {'vshard-router', 'app.roles.api_router'},
join_servers = {{uri = 'localhost:3301'}}
join_servers = {{uri = '0.0.0.0:3301'}}
}, {
alias = 'app-router-second',
roles = {'vshard-router', 'app.roles.api_router'},
join_servers = {{uri = 'localhost:3311'}}
join_servers = {{uri = '0.0.0.0:3311'}}
}, {
alias = 's1-storage',
roles = {'vshard-storage', 'app.roles.api_storage'},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TarantoolConfigSpec extends AnyFlatSpec with Matchers {
tConf.hosts should equal(Array(new TarantoolServerAddress("127.0.0.1:3301")))
tConf.credentials should equal(None)
tConf.timeouts should equal(Timeouts(None, None, None))
tConf.connections should equal(None)
}

it should "apply settings from options with priority" in {
Expand Down Expand Up @@ -62,9 +63,11 @@ class TarantoolConfigSpec extends AnyFlatSpec with Matchers {
.set("tarantool.hosts", "127.0.0.3:3303")
.set("tarantool.username", "aaaaa")
.set("tarantool.password", "bbbbb")
.set("tarantool.connections", "123")
val tConf: TarantoolConfig = TarantoolConfig(sparkConf)
tConf.hosts should equal(Array(new TarantoolServerAddress("127.0.0.3:3303")))
tConf.credentials should equal(Some(Credentials("aaaaa", "bbbbb")))
tConf.timeouts should equal(Timeouts(Some(10), Some(20), Some(30)))
tConf.connections should equal(Some(123))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TarantoolConnectionSpec
with Matchers
with TarantoolSparkClusterTestSuite {

"Client in Connection" should " be initialized only once" in {
"Client in TarantoolConnection" should " be initialized only once" in {
val conn1 = TarantoolConnection()
val conn2 = TarantoolConnection()

Expand All @@ -28,4 +28,25 @@ class TarantoolConnectionSpec
conn2.client(conf) should (not(equal(client)))
conn2.close()
}

"Client in TarantoolConnection" should " have the specified settings" in {
val sparkConf = SharedSparkContext.sc.getConf
.clone()
.set("tarantool.space", "test_space")
.set("tarantool.connectTimeout", "10")
.set("tarantool.readTimeout", "20")
.set("tarantool.requestTimeout", "30")
.set("tarantool.connections", "3")
val tConf: TarantoolConfig = TarantoolConfig(sparkConf)
val conn = TarantoolConnection()
val client = conn.client(tConf)
val actualConfig = client.getConfig()

actualConfig.getConnectTimeout() should equal(10)
actualConfig.getReadTimeout() should equal(20)
actualConfig.getRequestTimeout() should equal(30)
actualConfig.getConnections() should equal(3)

conn.close()
}
}