Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6980] [CORE] Akka timeout exceptions indicate which conf controls them (RPC Layer) #6205

Closed
wants to merge 41 commits into from

Conversation

BryanCutler
Copy link
Member

Latest changes after refactoring to the RPC layer. I rebased against trunk to make sure to get any recent changes since it had been a while. I wasn't crazy about the name ConfigureTimeout and RpcTimeout seemed to fit better, but I'm open to suggestions!

I ran most of the tests and they pass, but others would get stuck with "WARN TaskSchedulerImpl: Initial job has not accepted any resources". I think its just my machine, so I'd though I would push what I have anyway.

Still left to do:

  • I only added a couple unit tests so far, there are probably some more cases to test
  • Make sure all uses require a RpcTimeout
  • Right now, both the ask and Await.result use the same timeout, should we differentiate between these in the TimeoutException message?
  • I wrapped Await.result in RpcTimeout, should we also wrap Await.ready?
  • Proper scoping of classes and methods

@hardmettle, feel free to help out with any of these!

@squito
Copy link
Contributor

squito commented May 16, 2015

ok to test

}
catch {
case _: Throwable =>
RpcTimeout(conf, "spark.network.timeout", "120s")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you change this to catch a much narrower exception?

Or even better -- how about you move this into RpcTimeout.apply -- have it take a prioritized list of properties to check, and it will check the conf for each of them, avoiding any need for exception handling

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the second idea, I'll add that in another commit

@hardmettle
Copy link

@BryanCutler I will be checking out your PR changes and then will be carrying forward the changes which you have made by adding RpcTimeout at every usage , and will add test cases. In my opinion ask and Await.result should share same timeout to keep things simple (at most we can place a static message <timeout description message> + "at ask() \ Await.result()"). Also I feel along Await.result Await.ready should be covered. It would be better if @squito takes a call on this .

@hardmettle
Copy link

@BryanCutler can I get write access to your PR repo ? I want to merge my changes , how do I do it ?

@BryanCutler
Copy link
Member Author

@hardmettle are you able to checkout my branch, then maybe you could share any changes/additions as a pull-request?

* @param timeoutPropList prioritized list of property keys for the timeout in seconds
* @param defaultValue default timeout value in seconds if no properties found
*/
def apply(conf: SparkConf, timeoutPropList: Seq[String], defaultValue: String): RpcTimeout = {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating RpcTimeout from a prioritized list of property keys, from @squito previous request

@squito
Copy link
Contributor

squito commented May 21, 2015

@hardmettle I don't think there are any occurrences of Await.ready that are relevant here -- where they do occur, they aren't tied to a configuration variable, the wait is infinite, or its in test code, etc.

Harsh Gupta and others added 3 commits May 22, 2015 01:52
[SPARK-6980] [CORE] [WIP] Creating wrapper for Akka timeout exceptions to get better information using conf (RPC Layer)
@squito
Copy link
Contributor

squito commented May 29, 2015

@BryanCutler @hardmettle just checking if there is any update here, let me know when its ready to take another look. Seemed like it was getting close, only "big" thing was what to do with actorSelection.resolveOne.

@hardmettle
Copy link

@squito I have been banging my head on this . any suggestions for improvements?

@BryanCutler
Copy link
Member Author

Hey guys, I've been playing around with making the futures fail before calling await, here is a test case that does it:

test("Future failure with RpcTimeout") {

    class EchoActor extends Actor {
      def receive: Receive = {
        case msg =>
          Thread.sleep(50)
          sender() ! msg
      }
    }

    val system = ActorSystem("EchoSystem")
    val echoActor = system.actorOf(Props(new EchoActor), name = "echoA")

    val timeout = new RpcTimeout(50 millis, "spark.rpc.short.timeout")

    val fut = echoActor.ask("hello")(10 millis).mapTo[String].recover {
      case te: TimeoutException => throw timeout.amend(te)
    }

    fut.onFailure {
      case te: TimeoutException => println("failed with timeout exception")
    }

    fut.onComplete {
      case Success(str) => println("future success")
      case Failure(ex) => println("future failure")
    }

    println("sleeping")
    Thread.sleep(50)
    println("Future complete: " + fut.isCompleted.toString() + ", " + fut.value.toString())

    println("Caught TimeoutException: " +
      intercept[TimeoutException] {
        //timeout.awaitResult(fut)  // will print RpcTimeout description twice
        Await.result(fut, 10 millis)
      }.getMessage()
    )
}

If we use recover, then when a future fails we can check for a TimeoutException and then re-raise the exception with our modified message as before, and we wouldn't need to do it in Await.result anymore. It also doesn't change the type of Future returned. I kind of think this is a better solution because it would handle all cases of Future timeouts regardless if Await.result is used. @squito, @hardmettle - what do you guys think?

@squito
Copy link
Contributor

squito commented May 31, 2015

@BryanCutler haven't looked at this closely yet, but quick question -- are you talking using this just for actorSelction.resolveOne? Or are you proposing doing this in place of what you already have?

@BryanCutler
Copy link
Member Author

@squito I was thinking to use this in place of what we have, but actually it wouldn't cover the case where Await.result times out while the future is still not completed. To cover all cases we could add a function in RpcTimeout that would take a Future[T] as input and use recover (as in the code above) to handle a timeout on the future. Then we would have to subclass TimeoutException, as you suggested, so we don't end up amending the message twice. If you want, I could code this up to see how it would look?

One thing I'm not too sure of though, when using recover it requires an ExecutionContext. In the test code I posted, it was fine to import the implicit conversion to the default global context, but I don't know if the same holds true for the rest of Spark. Any thoughts on this?

@squito
Copy link
Contributor

squito commented Jun 2, 2015

Hi @BryanCutler,
I finally took a closer look at this. I hadn't realized until you brought it up that even ask takes one timeout on the original call, and then another one in Await.result. If the timeout to the original ask expires, you get an AskTimeoutException, eg. akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://EchoSystem/user/echoA#-553815755]] after [50 ms], but if its the timeout in Await.result, you just get a TimeoutException, eg. java.util.concurrent.TimeoutException: Futures timed out after [50 milliseconds]. Eg:

val fut = echoActor.ask("hello")(50 millis)
val inBetweenWait = (15 millis)
Thread.sleep(inBetweenWait.toMillis)
val result = Await.result(fut, 50 millis)

for me the behavior seems flip flop somewhere around inBetweenWait = 15 millis, but that will most likely vary. Even if the ask timeout is shorter than the Await timeout, I still might get the exception from Await.

In any case, the reason I'm bringing this up is that we're sort of in the same scenario with actorSelection.resolveOne (looks like that timeout is just passed on to ask).

Your suggestion of using recover works for handling that exception from ask or resolveOne etc, basically to handle a timeout that might already be baked into a Future. I'd like to keep around what you already have, since (a) we still need something else for Await.result b/c that might timeout first (as you've noted) and (b) its a lot simpler to understand in my opinion.

I think its worth adding your suggestion as well for those remaining cases -- its a nice trick :). I didn't see any existing uses for those methods, but most likely the resulting Futures will just get wrapped in an Await.result with the same timeout, but it would still be nice to have that in place. I think just (a) throwing your own subclass of TimeoutException to avoid double appending and (b) having timeout.addMessageIfTimeout(future) (or whatever you want to call it) take in an ExecutionContext seems fine. (You can see that scala.concurrent.ExecutionContext.Implicits.global is imported elsewhere in AkkaRpcEnv.)

phew that was a mouthful. hope this all makes sense

@BryanCutler
Copy link
Member Author

Hi @squito,
Yeah that all makes sense and I think we are on the same page, so let me put what we've been talking about in another commit and we can take another look. I noticed too that the timeout in ask is throwing an AskTimeoutException, do think it is ok to be changing it to just a TimeoutException? I didn't find anywhere explicitly catching AskTimeoutException, so it might not be a problem. Thanks!

@andrewor14
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jun 29, 2015

Test build #36033 timed out for PR 6205 at commit 6a1c50d after a configured wait of 175m.

…rnSchedulerBackend although it is not being used, and fixed SparkConfSuite UT to not use deprecated RpcUtils functions
@BryanCutler
Copy link
Member Author

Hey @squito , it looks like the last Jenkins test timed out before it even got to RpcEnvSuite, can you trigger the test again?

@@ -47,14 +47,22 @@ object RpcUtils {
}

/** Returns the default Spark timeout to use for RPC ask operations. */
def askRpcTimeout(conf: SparkConf): RpcTimeout = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[spark]

@squito
Copy link
Contributor

squito commented Jul 1, 2015

Hi @BryanCutler sorry for the delay on my end. Thanks for updating. Unfortunately looks like there are more conflicts with master, looks like you need to merge again.

@BryanCutler
Copy link
Member Author

No problem, I'll try to resolve these today

@SparkQA
Copy link

SparkQA commented Jul 1, 2015

Test build #992 timed out for PR 6205 at commit 4e89c75 after a configured wait of 175m.

…e to private[spark] and improved deprecation warning msg
Conflicts:
	core/src/main/scala/org/apache/spark/deploy/Client.scala
	core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
	core/src/main/scala/org/apache/spark/deploy/master/Master.scala
	core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
	core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
	core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
	core/src/main/scala/org/apache/spark/deploy/rest/StandaloneRestServer.scala
	core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala
	core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala
@BryanCutler
Copy link
Member Author

I merged with master, which now mostly calls askWithReply directly. After that I started seeing an error in running a MasterSuite test that complained about RpcTimeout not being serializable. I changed the class definition to this

private[spark] class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String)
  extends Serializable {

And the test was able to pass. Does this seem correct @squito ?

@squito
Copy link
Contributor

squito commented Jul 2, 2015

@BryanCutler yeah making it serializable seems fine. I have a feeling that there is probably some field that could be marked transient but that is not a big deal.

@zsxwing do you want to take one more pass?

@squito
Copy link
Contributor

squito commented Jul 2, 2015

Jenkins, retest this please

@zsxwing
Copy link
Member

zsxwing commented Jul 2, 2015

serializable is fine. I forgot to add a test about sending RpcEndpointRef between different RpcEnvs. I will add a test in #6457.

@@ -147,7 +146,7 @@ private[spark] object AkkaUtils extends Logging {
def askWithReply[T](
message: Any,
actor: ActorRef,
timeout: FiniteDuration): T = {
timeout: RpcTimeout): T = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest reverting the changes to AkkaUtils. These methods you changed won't be called any more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I disagree. No harm in making these changes. If we're really certain AkkaUtils wont' be used anymore, then we'll delete it ... but in the meantime, these changes are here just in case.

Maybe it wasn't worth the effort in the first place (my fault!) but I dont' see the harm ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK with the changes. Not a big deal.

@zsxwing
Copy link
Member

zsxwing commented Jul 2, 2015

LGTM except two minor comments.

@SparkQA
Copy link

SparkQA commented Jul 2, 2015

Test build #995 has finished for PR 6205 at commit 7bb70f1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ExpectsInputTypes
    • trait AutoCastInputTypes

@SparkQA
Copy link

SparkQA commented Jul 2, 2015

Test build #996 has finished for PR 6205 at commit 46c8d48.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ExpectsInputTypes
    • trait AutoCastInputTypes

@SparkQA
Copy link

SparkQA commented Jul 2, 2015

Test build #997 has finished for PR 6205 at commit 46c8d48.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage
    • case class RegisteredWorker(master: RpcEndpointRef, masterWebUiUrl: String) extends DeployMessage
    • case class RegisterApplication(appDescription: ApplicationDescription, driver: RpcEndpointRef)
    • case class RegisteredApplication(appId: String, master: RpcEndpointRef) extends DeployMessage
    • case class SubmitDriverResponse(
    • case class KillDriverResponse(
    • case class MasterChanged(master: RpcEndpointRef, masterWebUiUrl: String)
    • class DCT(override val uid: String)
    • class MinMaxScaler(override val uid: String)
    • class PCA (override val uid: String) extends Estimator[PCAModel] with PCAParams
    • class StreamingLinearAlgorithm(object):
    • class StreamingLinearRegressionWithSGD(StreamingLinearAlgorithm):
    • class AnalysisException(Exception):
    • case class Cast(child: Expression, dataType: DataType) extends UnaryExpression with Logging
    • trait ExpectsInputTypes
    • trait AutoCastInputTypes
    • trait ExtractValue
    • abstract class ExtractValueWithStruct extends UnaryExpression with ExtractValue
    • abstract class ExtractValueWithOrdinal extends BinaryExpression with ExtractValue
    • class SpecificOrdering extends $
    • class SpecificProjection extends $
    • final class SpecificRow extends $
    • case class Crc32(child: Expression)
    • // compiled class file for the closure here will conflict with the one in callUDF (upper case).

@asfgit asfgit closed this in aa7bbc1 Jul 3, 2015
@squito
Copy link
Contributor

squito commented Jul 3, 2015

merged to master.

Thanks for all your hard work on this @BryanCutler! this was probably a lot more work than you initially expected (and me too), but I'm glad you stuck with it, its a great addition.

@BryanCutler
Copy link
Member Author

Awesome, thanks for all your help @squito! That was a heck of a first Jira, but it was a great experience and I definitely learned a lot.

@BryanCutler BryanCutler deleted the configTimeout-6980 branch November 18, 2015 21:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants