Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed schedule race and task retention with ExecutorScheduler. #2907

Merged
merged 1 commit into from
May 13, 2015

Conversation

akarnokd
Copy link
Member

Fixes a race condition with the timed schedule (first potentially overwriting the result of the untimed schedule in mas) and a scheduled task retention problem due to not tracking those.

@akarnokd akarnokd added the Bug label Apr 23, 2015
// note that since we schedule the emission of potentially multiple tasks
// there is no clear way to cancel this schedule from individual tasks
// so even if executor is an ExecutorService, we can't associate the future
// returned by submit() with any particular ScheduledAction
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is somewhat confusing to me. The "potentially multiple tasks" is just that we're scheduling the "this" reference over and over again isn't it?

We are scheduling a single "this" one at a time so that it then drains from the queue to ensure sequential execution.

Thus, the scheduling on the Executor has little to do with any particular Action0. Is that what you're saying?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the scenario: I submit 2 tasks concurrently. The first one will start the schedule with this. Now let's assume the executor is busy so this is waiting in the pool's queue. If I cancel the 2 tasks, in theory, there is no need to run this anymore but to cancel it, one needs very complicated accounting. So in other terms, the best we can do here is to do the emission loop and check if the particular ScheduledAction is unsubscribed or not. The downside is the long retention of such tasks.

@akarnokd
Copy link
Member Author

I've discovered another retention problem and updated the PR to fix it as well.

@benjchristensen
Copy link
Member

Thanks for the explanation.

benjchristensen added a commit that referenced this pull request May 13, 2015
Fixed schedule race and task retention with ExecutorScheduler.
@benjchristensen benjchristensen merged commit e1d50e4 into ReactiveX:1.x May 13, 2015
@benjchristensen benjchristensen mentioned this pull request May 19, 2015
@akarnokd akarnokd deleted the SchedulerFromFix branch September 9, 2015 15:35
@zhiyanshao
Copy link

@akarnokd @benjchristensen

Is this fixed? I still see the following exception on 1.0.14. Below is the stack from our service's log:

ERROR [2015-09-23 20:54:00,640] com.mycompany.mobile.dropwizard.exception.AbstractExceptionMapper: Error handling request:
! java.util.concurrent.RejectedExecutionException: Task rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker@5e731de2 rejected from java.util.concurrent.ThreadPoolExecutor@500df7b6[Running, pool size = 25, active threads = 5, queued tasks = 25, completed tasks = 6741988]
! at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_51]
! at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) ~[na:1.8.0_51]
! at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) ~[na:1.8.0_51]
! at com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:550) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.schedulers.ExecutorScheduler$ExecutorSchedulerWorker.schedule(ExecutorScheduler.java:78) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.internal.operators.OperatorSubscribeOn$1.onNext(OperatorSubscribeOn.java:57) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.internal.operators.OperatorSubscribeOn$1.onNext(OperatorSubscribeOn.java:43) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.Observable$2.call(Observable.java:162) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.Observable$2.call(Observable.java:154) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.Observable$2.call(Observable.java:162) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.Observable$2.call(Observable.java:154) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.Observable$2.call(Observable.java:162) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.Observable$2.call(Observable.java:154) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.Observable$2.call(Observable.java:162) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.Observable$2.call(Observable.java:154) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.Observable.subscribe(Observable.java:7804) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.Observable.subscribe(Observable.java:7772) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:432) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at rx.observables.BlockingObservable.first(BlockingObservable.java:159) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.mycompany.mobile.common.concurrency.ObservableUtil.asList(ObservableUtil.java:27) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.mycompany.mobile.token.PushDeviceTokenResource.getTokenByDeviceToken(PushDeviceTokenResource.java:261) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source) ~[na:na]
! at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_51]
! at java.lang.reflect.Method.invoke(Method.java:497) ~[na:1.8.0_51]
! at com.sun.jersey.spi.container.JavaMethodInvokerFactory$1.invoke(JavaMethodInvokerFactory.java:60) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.server.impl.model.method.dispatch.AbstractResourceMethodDispatchProvider$ResponseOutInvoker._dispatch(AbstractResourceMethodDispatchProvider.java:205) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.server.impl.model.method.dispatch.ResourceJavaMethodDispatcher.dispatch(ResourceJavaMethodDispatcher.java:75) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.codahale.metrics.jersey.InstrumentedResourceMethodDispatchProvider$ExceptionMeteredRequestDispatcher.dispatch(InstrumentedResourceMethodDispatchProvider.java:69) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at io.dropwizard.jersey.guava.OptionalResourceMethodDispatchAdapter$OptionalRequestDispatcher.dispatch(OptionalResourceMethodDispatchAdapter.java:37) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.server.impl.uri.rules.HttpMethodRule.accept(HttpMethodRule.java:302) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.server.impl.uri.rules.ResourceObjectRule.accept(ResourceObjectRule.java:100) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.server.impl.uri.rules.RightHandPathRule.accept(RightHandPathRule.java:147) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.server.impl.uri.rules.RootResourceClassesRule.accept(RootResourceClassesRule.java:84) ~[push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1542) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.server.impl.application.WebApplicationImpl._handleRequest(WebApplicationImpl.java:1473) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1419) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.server.impl.application.WebApplicationImpl.handleRequest(WebApplicationImpl.java:1409) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.spi.container.servlet.WebComponent.service(WebComponent.java:409) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:540) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.sun.jersey.spi.container.servlet.ServletContainer.service(ServletContainer.java:715) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at javax.servlet.http.HttpServlet.service(HttpServlet.java:770) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at io.dropwizard.jetty.NonblockingServletHolder.handle(NonblockingServletHolder.java:49) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1515) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.servlets.UserAgentFilter.doFilter(UserAgentFilter.java:83) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.servlets.GzipFilter.doFilter(GzipFilter.java:348) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at io.dropwizard.jetty.BiDiGzipFilter.doFilter(BiDiGzipFilter.java:127) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1486) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at io.dropwizard.servlets.ThreadNameFilter.doFilter(ThreadNameFilter.java:29) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1486) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at io.dropwizard.jersey.filter.AllowedMethodsFilter.handle(AllowedMethodsFilter.java:44) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at io.dropwizard.jersey.filter.AllowedMethodsFilter.doFilter(AllowedMethodsFilter.java:39) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1486) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:519) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1097) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:448) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1031) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:136) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at com.codahale.metrics.jetty9.InstrumentedHandler.handle(InstrumentedHandler.java:175) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at io.dropwizard.jetty.RoutingHandler.handle(RoutingHandler.java:51) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.handler.RequestLogHandler.handle(RequestLogHandler.java:92) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:162) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.Server.handle(Server.java:446) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:271) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:246) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.io.AbstractConnection$ReadCallback.run(AbstractConnection.java:358) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:601) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:532) [push-token-server-0.0.99-shaded.jar:0.0.99]
! at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]

@akarnokd
Copy link
Member Author

@zhiyanshao You seem to have a different problem than what this PR fixes. Either your pool has been shutdown in the middle or you are running with a bounded internal queue that gets overflown.

@zhiyanshao
Copy link

@akarnokd , thank you for your reply.

Do you know under what circumstances the pool will be shutdown in the middle and how I can tell if I am running with a bounded internal queue? Are these two scenarios by design and can be fixed in my code?

@akarnokd
Copy link
Member Author

It appears you are using some Google threadpool-helper classes. Look where the Executor or ExecutorService is configured in your project.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants