Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to check type of Runnable inside the RxJavaPlugins onScheduleHandler #5733

Closed
lukaszguz opened this issue Nov 20, 2017 · 6 comments
Closed

Comments

@lukaszguz
Copy link
Contributor

Hi
RxJava2 version: [2.1.6]
I have problem with RxJavaPlugins.onScheduleHandler.

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
}

At the moment of changing thread pool Runnable is wrapped into DisposeTask so I can't check type of action inside scheduleHandler because DisposeTask is package scope.
Do you have any idea how to solve this problem without reflection? 😀

Here is my sample code: https://github.com/lukaszguz/spring-cloud-sleuth/blob/master/spring-cloud-sleuth-core/src/main/java/org/springframework/cloud/sleuth/instrument/rxjava2/SleuthRxJava2SchedulersHandler.java

@akarnokd
Copy link
Member

Unfortunately you can't; it requires exposing the type or at least some API to unwrap the internal wrappers. I don't have much time this week to work the details out though.

@akarnokd
Copy link
Member

Sketch for a solution:

  1. Define an internal public interface SchedulerRunnableWrapper with a Runnable. getWrappedRunnable().
  2. Implement the interface on every Runnable wrapper in the various Schedulers.
  3. Add a RxJavaPlugins.unwrapRunnable(Object task) which performs an instanceof check and calls getWrappedRunnable().

Names are only suggestions.

@lukaszguz
Copy link
Contributor Author

Awesome :) I will try to write PR based on your tips. Thank you very much for your answer. :)

@artem-zinnatullin
Copy link
Contributor

@lukaszguz I was reviewing your PR and caught myself couple times on the question why would anyone need to unwrap runnable and what useful actions can you actually do with it if original runnable is created by RxJava itself (unless you use Schedulers in a decoupled way from reactive types).

While I'm trying to find answer to this question myself, answer from you would be great :)

I've looked through the tracing code you've posted and not sure I understand the purpose of if (isTraceActionDecoratedByRxWorker). I assume you want to only track "original" action or minimize overhead added by Scheduler/etc?

But JFYI information check output of following simple case:

@Test
public void t() {
    final AtomicInteger scheduleHandlerCallCounter = new AtomicInteger();
    final AtomicInteger isTraceActionDecoratedByRxWorkerCounter = new AtomicInteger();
    final AtomicInteger isTraceActionNotDecoratedByRxWorkerCounter = new AtomicInteger();

    RxJavaPlugins.setScheduleHandler(new Function<Runnable, Runnable>() {
        @Override
        public Runnable apply(Runnable action) throws Exception {
            scheduleHandlerCallCounter.incrementAndGet();

            if (isTraceActionDecoratedByRxWorker(action)) {
                isTraceActionDecoratedByRxWorkerCounter.incrementAndGet();
                return action;
            }

            isTraceActionNotDecoratedByRxWorkerCounter.incrementAndGet();
            return new TraceAction(action);
        }
    });

    Observable
            .fromCallable(() -> "hello")
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .test()
            .awaitTerminalEvent();
    
    System.out.println("scheduleHandlerCallCounter == " + scheduleHandlerCallCounter.get());
    System.out.println("isTraceActionDecoratedByRxWorkerCounter == " + isTraceActionDecoratedByRxWorkerCounter.get());
    System.out.println("isTraceActionNotDecoratedByRxWorkerCounter == " + isTraceActionNotDecoratedByRxWorkerCounter.get());
}

Rest of code is copied from your example.

Output:

scheduleHandlerCallCounter == 3
isTraceActionDecoratedByRxWorkerCounter == 1
isTraceActionNotDecoratedByRxWorkerCounter == 2

So as you can see isTraceActionNotDecoratedByRxWorkerCounter is 2, so your code will end up tracking "original" action twice.

By "original" I mean Runnable closest to action provided by user through some rx operator or directly through Scheduler.schedule*()

// Sorry for holding this, had to deal with some personal stuff

@lukaszguz
Copy link
Contributor Author

Thanks for answer :) I really appreciate your help and advices :)
I show you what I want to achieve added a few logs.

private AtomicInteger counter = new AtomicInteger(1);

public ScheduleHandler(Tracer tracer, TraceKeys traceKeys,
                               List<String> threadsToSample,
                               Function<? super Runnable, ? extends Runnable> delegate) {
            this.tracer = tracer;
            this.traceKeys = traceKeys;
            this.threadsToSample = threadsToSample;
            this.delegate = delegate;
        }
       	@Override
		public Runnable apply(Runnable action) throws Exception {
			if (isTraceActionDecoratedByRxWorker(action)) {
				return action;
			}
			Runnable wrappedAction = this.delegate != null ? this.delegate.apply(action)
					: action;
			return new SleuthRxJava2SchedulersHandler.TraceAction(counter.getAndIncrement(),this.tracer,
					this.traceKeys, wrappedAction, this.threadsToSample);
		}
...
}

static class TraceAction implements Runnable {

        private int id;
        private final Runnable actual;
        private final Tracer tracer;
        private final TraceKeys traceKeys;
        private final Span parent;
        private final List<String> threadsToIgnore;

        public TraceAction(int id, Tracer tracer, TraceKeys traceKeys, Runnable actual,
                           List<String> threadsToIgnore) {
            this.id = id;
            this.tracer = tracer;
            this.traceKeys = traceKeys;
            this.threadsToIgnore = threadsToIgnore;
            this.parent = tracer.getCurrentSpan();
            this.actual = actual;
        }

        @Override
        public void run() {
            String threadName = Thread.currentThread()
                                      .getName();
            // don't create a span if the thread name is on a list of threads to ignore
            for (String threadToIgnore : this.threadsToIgnore) {
                if (threadName.matches(threadToIgnore)) {
                    if (log.isTraceEnabled()) {
                        log.trace(String.format(
                                "Thread with name [%s] matches the regex [%s]. A span will not be created for this Thread.",
                                threadName, threadToIgnore));
                    }
                    this.actual.run();
                    return;
                }
            }
            Span span = this.parent;
            boolean created = false;
            if (span != null) {
                span = this.tracer.continueSpan(span);
                log.info("Continue span " + id);
            }
            else {
                span = this.tracer.createSpan(RXJAVA_COMPONENT);
                this.tracer.addTag(Span.SPAN_LOCAL_COMPONENT_TAG_NAME, RXJAVA_COMPONENT);
                this.tracer.addTag(
                        this.traceKeys.getAsync()
                                      .getPrefix()
                        + this.traceKeys.getAsync()
                                        .getThreadNameKey(),
                        Thread.currentThread()
                              .getName());
                log.info("Created span " + id);
                created = true;
            }
            try {
                this.actual.run();
            } finally {
                if (created) {
                    this.tracer.close(span);
                    log.info("Close span " + id);
                }
                else if (this.tracer.isTracing()) {
                    this.tracer.detach(span);
                    log.info("Detach span " + id);
                }
            }
        }
    }

Results with checking Runnable type using isTraceActionDecoratedByRxWorker:

should_create_new_span_when_rx_java_action_is_executed_and_there_was_no_span:
2017-11-26 22:10:50.520  INFO [bootstrap,ec1c1b586985bad1,ec1c1b586985bad1,true] 8424 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Created span 1
2017-11-26 22:10:50.531  INFO [bootstrap,,,] 8424 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Close span 1
2017-11-26 22:10:50.749  INFO [bootstrap,32114f1689d03fa1,32114f1689d03fa1,true] 8424 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Created span 2
2017-11-26 22:10:50.750  INFO [bootstrap,,,] 8424 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Close span 2

should_create_new_span_when_rx_java_action_is_executed_and_there_was_no_span_and_on_thread_pool_changed:
2017-11-26 22:12:20.831  INFO [bootstrap,6b89c093e75100b4,6b89c093e75100b4,true] 8585 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Created span 1
2017-11-26 22:12:20.837  INFO [bootstrap,,,] 8585 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Close span 1
2017-11-26 22:12:20.838  INFO [bootstrap,6b89c093e75100b4,6b89c093e75100b4,true] 8585 --- [       myPool-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Continue span 2
2017-11-26 22:12:20.838  INFO [bootstrap,6b89c093e75100b4,6b89c093e75100b4,true] 8585 --- [       myPool-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Detach span 2
2017-11-26 22:12:20.838  INFO [bootstrap,6b89c093e75100b4,6b89c093e75100b4,true] 8585 --- [      myPool2-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Continue span 3
2017-11-26 22:12:20.843  INFO [bootstrap,6b89c093e75100b4,6b89c093e75100b4,true] 8585 --- [      myPool2-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Detach span 3

should_continue_current_span_when_rx_java_action_is_executed:
2017-11-26 22:12:51.865  INFO [bootstrap,75def52cdd83d7f4,75def52cdd83d7f4,true] 8678 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Continue span 1
2017-11-26 22:12:51.868  INFO [bootstrap,75def52cdd83d7f4,75def52cdd83d7f4,true] 8678 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Detach span 1

Each Observables, Flowables etc. has correct closed or detached span in Spring Sleuth.
When I've removed if (isTraceActionDecoratedByRxWorker(action)) then order is wrong .

should_create_new_span_when_rx_java_action_is_executed_and_there_was_no_span:
2017-11-26 22:19:25.900  INFO [bootstrap,b4ec51be80a38afc,b4ec51be80a38afc,true] 9120 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Created span 2
2017-11-26 22:19:25.900  INFO [bootstrap,b4ec51be80a38afc,a8e7e7bdcf244250,true] 9120 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Created span 1
2017-11-26 22:19:25.905  INFO [bootstrap,b4ec51be80a38afc,b4ec51be80a38afc,true] 9120 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Close span 1
2017-11-26 22:19:25.907  INFO [bootstrap,,,] 9120 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Close span 2

should_create_new_span_when_rx_java_action_is_executed_and_there_was_no_span_and_on_thread_pool_changed:
2017-11-26 22:20:03.574  INFO [bootstrap,fc503e40978be97d,fc503e40978be97d,true] 9221 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Created span 2
2017-11-26 22:20:03.574  INFO [bootstrap,fc503e40978be97d,d7698b7ad0a61c62,true] 9221 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Created span 1
2017-11-26 22:20:03.591  INFO [bootstrap,fc503e40978be97d,fc503e40978be97d,true] 9221 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Close span 1
2017-11-26 22:20:03.591  INFO [bootstrap,fc503e40978be97d,d7698b7ad0a61c62,true] 9221 --- [       myPool-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Continue span 3
2017-11-26 22:20:03.591  INFO [bootstrap,,,] 9221 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Close span 2
2017-11-26 22:20:03.608  INFO [bootstrap,fc503e40978be97d,d7698b7ad0a61c62,true] 9221 --- [       myPool-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Detach span 3
2017-11-26 22:20:03.608  INFO [bootstrap,fc503e40978be97d,d7698b7ad0a61c62,true] 9221 --- [      myPool2-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Continue span 4
2017-11-26 22:20:03.609  INFO [bootstrap,fc503e40978be97d,d7698b7ad0a61c62,true] 9221 --- [      myPool2-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Detach span 4

should_continue_current_span_when_rx_java_action_is_executed:
2017-11-26 22:20:37.213  INFO [bootstrap,19a9401578edbc83,19a9401578edbc83,true] 9320 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Continue span 2
2017-11-26 22:20:37.213  INFO [bootstrap,19a9401578edbc83,19a9401578edbc83,true] 9320 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Continue span 1
2017-11-26 22:20:37.217  INFO [bootstrap,19a9401578edbc83,19a9401578edbc83,true] 9320 --- [readScheduler-1] s.c.s.i.r.SleuthRxJava2SchedulersHandler : Detach span 1

I can't use any counter or flag which tell me when I have to wrap runnable in TraceAction because I don't have any place when I can check it. :/ The problem is solved when I have information about original type of runnable.

@akarnokd
Copy link
Member

akarnokd commented Dec 4, 2017

Closing via #5734.

@akarnokd akarnokd closed this as completed Dec 4, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants