diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index 6835170ae07f5..634df4ea7bfc4 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -546,18 +546,26 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { } } - test("construction of RpcTimeout using properties") { + test("construct RpcTimeout with conf property") { val conf = new SparkConf val testProp = "spark.ask.test.timeout" val testDurationSeconds = 30 + val secondaryProp = "spark.ask.secondary.timeout" conf.set(testProp, testDurationSeconds.toString + "s") + conf.set(secondaryProp, "100s") - val rt = RpcTimeout(conf, testProp) - assert( testDurationSeconds === rt.duration.toSeconds ) + // Construct RpcTimeout with a single property + val rt1 = RpcTimeout(conf, testProp) + assert( testDurationSeconds === rt1.duration.toSeconds ) - val ex = intercept[Throwable] { + // Construct RpcTimeout with prioritized list of properties + val rt2 = RpcTimeout(conf, Seq("spark.ask.invalid.timeout", testProp, secondaryProp), "1s") + assert( testDurationSeconds === rt2.duration.toSeconds ) + + // Try to construct RpcTimeout with an unconfigured property + intercept[Throwable] { RpcTimeout(conf, "spark.ask.invalid.timeout") } } diff --git a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala index caa069a3f37ea..a5b0f1e5aa3e6 100644 --- a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala @@ -59,72 +59,60 @@ class AkkaRpcEnvSuite extends RpcEnvSuite { } } - test("Future failure with RpcTimeout") { + test("timeout on ask Future with RpcTimeout") { - class EchoActor extends Actor { + class EchoActor(sleepDuration: Long) extends Actor { def receive: Receive = { case msg => - Thread.sleep(500) + Thread.sleep(sleepDuration) sender() ! msg } } val system = ActorSystem("EchoSystem") - val echoActor = system.actorOf(Props(new EchoActor), name = "echoA") + val echoActor = system.actorOf(Props(new EchoActor(0)), name = "echo") + val sleepyActor = system.actorOf(Props(new EchoActor(50)), name = "sleepy") - val timeout = new RpcTimeout(50 millis, "spark.rpc.short.timeout") + val shortProp = "spark.rpc.short.timeout" + val timeout = new RpcTimeout(10 millis, shortProp) - val fut = echoActor.ask("hello")(1000 millis).mapTo[String].recover { - case te: TimeoutException => throw timeout.amend(te) - } + try { - fut.onFailure { - case te: TimeoutException => println("failed with timeout exception") - } + // Ask with immediate response + var fut = echoActor.ask("hello")(timeout.duration).mapTo[String]. + recover(timeout.addMessageIfTimeout) - fut.onComplete { - case Success(str) => println("future success") - case Failure(ex) => println("future failure") - } + // This should complete successfully + val result = timeout.awaitResult(fut) - println("sleeping") - Thread.sleep(50) - println("Future complete: " + fut.isCompleted.toString() + ", " + fut.value.toString()) + assert(result.nonEmpty) - println("Caught TimeoutException: " + - intercept[TimeoutException] { - //timeout.awaitResult(fut) // prints RpcTimeout description twice - Await.result(fut, 10 millis) - }.getMessage() - ) + // Ask with delayed response + fut = sleepyActor.ask("goodbye")(timeout.duration).mapTo[String]. + recover(timeout.addMessageIfTimeout) - /* - val ref = env.setupEndpoint("test_future", new RpcEndpoint { - override val rpcEnv = env + // Allow future to complete with failure using plain Await.result, this will return + // once the future is complete + val msg1 = + intercept[RpcTimeoutException] { + Await.result(fut, 200 millis) + }.getMessage() - override def receive = { - case _ => - } - }) - val conf = new SparkConf() - val newRpcEnv = new AkkaRpcEnvFactory().create( - RpcEnvConfig(conf, "test", "localhost", 12346, new SecurityManager(conf))) - try { - val newRef = newRpcEnv.setupEndpointRef("local", ref.address, "test_future") - val akkaActorRef = newRef.asInstanceOf[AkkaRpcEndpointRef].actorRef + assert(msg1.contains(shortProp)) - val timeout = new RpcTimeout(1 millis, "spark.rpc.short.timeout") - val fut = akkaActorRef.ask("hello")(timeout.duration).mapTo[String] + // Use RpcTimeout.awaitResult to process Future, since it has already failed with + // RpcTimeoutException, the same exception should be thrown + val msg2 = + intercept[RpcTimeoutException] { + timeout.awaitResult(fut) + }.getMessage() - Thread.sleep(500) - println("Future complete: " + fut.isCompleted.toString() + ", " + fut.value.toString()) + // Ensure description is not in message twice after addMessageIfTimeout and awaitResult + assert(shortProp.r.findAllIn(msg2).length === 1) } finally { - newRpcEnv.shutdown() + system.shutdown() } - */ - - } }