Skip to content
This repository has been archived by the owner on Apr 8, 2023. It is now read-only.

Commit

Permalink
eventbus: Multithreading support
Browse files Browse the repository at this point in the history
  • Loading branch information
Owain94 committed Nov 25, 2019
1 parent b2a609b commit 675c305
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

import com.jakewharton.rxrelay2.PublishRelay;
import com.jakewharton.rxrelay2.Relay;
import io.reactivex.ObservableTransformer;
import io.reactivex.Scheduler;
import io.reactivex.annotations.NonNull;
import io.reactivex.annotations.Nullable;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.schedulers.Schedulers;
import io.sentry.Sentry;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -15,6 +19,7 @@
import lombok.extern.slf4j.Slf4j;
import net.runelite.api.events.Event;
import net.runelite.client.RuneLiteProperties;
import net.runelite.client.callback.ClientThread;
import net.runelite.client.config.OpenOSRSConfig;

@Slf4j
Expand All @@ -28,6 +33,9 @@ public class EventBus implements EventBusInterface
@Inject
private OpenOSRSConfig openOSRSConfig;

@Inject
private ClientThread clientThread;

@NonNull
private <T extends Event> Relay<Object> getSubject(Class<T> eventClass)
{
Expand All @@ -47,45 +55,78 @@ private CompositeDisposable getCompositeDisposable(@NonNull Object object)
return compositeDisposable;
}

@Override
// Subscribe on lifecycle (for example from plugin startUp -> shutdown)
public <T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action)
private <T> ObservableTransformer<T, T> applyTake(int until)
{
if (subscriptionList.containsKey(lifecycle) && eventClass.equals(subscriptionList.get(lifecycle)))
return observable -> until > 0 ? observable.take(until) : observable;
}

private Scheduler getScheduler(EventScheduler scheduler)
{
Scheduler subscribeScheduler;
switch (scheduler)
{
return;
case COMPUTATION:
subscribeScheduler = Schedulers.computation();
break;
case IO:
subscribeScheduler = Schedulers.io();
break;
case NEWTHREAD:
subscribeScheduler = Schedulers.newThread();
break;
case SINGLE:
subscribeScheduler = Schedulers.single();
break;
case TRAMPOLINE:
subscribeScheduler = Schedulers.trampoline();
break;
case CLIENT:
subscribeScheduler = Schedulers.from(clientThread);
break;
case DEFAULT:
default:
subscribeScheduler = null;
break;
}

Disposable disposable = getSubject(eventClass)
.filter(Objects::nonNull) // Filter out null objects, better safe than sorry
.cast(eventClass) // Cast it for easier usage
.subscribe(action, error ->
{
log.error("Exception in eventbus", error);
return subscribeScheduler;
}

if (RuneLiteProperties.getLauncherVersion() != null && openOSRSConfig.shareLogs())
{
Sentry.capture(error);
}
});
private <T> ObservableTransformer<T, T> applyScheduler(EventScheduler eventScheduler, boolean subscribe)
{
Scheduler scheduler = getScheduler(eventScheduler);

getCompositeDisposable(lifecycle).add(disposable);
subscriptionList.put(lifecycle, eventClass);
return observable -> scheduler == null ? observable : subscribe ? observable.subscribeOn(scheduler) : observable.observeOn(scheduler);
}

@Override
public <T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action, int takeUntil)
public <T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action)
{
subscribe(eventClass, lifecycle, action, -1, EventScheduler.DEFAULT, EventScheduler.DEFAULT);
}

@Override
public <T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action, int takeUtil)
{
subscribe(eventClass, lifecycle, action, takeUtil, EventScheduler.DEFAULT, EventScheduler.DEFAULT);
}

@Override
// Subscribe on lifecycle (for example from plugin startUp -> shutdown)
public <T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action, int takeUntil, @Nullable EventScheduler subscribe, @Nullable EventScheduler observe)
{
if (subscriptionList.containsKey(lifecycle) && eventClass.equals(subscriptionList.get(lifecycle)))
{
return;
}

Disposable disposable = getSubject(eventClass)
.compose(applyTake(takeUntil))
.filter(Objects::nonNull) // Filter out null objects, better safe than sorry
.cast(eventClass) // Cast it for easier usage
.take(takeUntil)
.doFinally(() -> unregister(lifecycle))
.compose(applyScheduler(subscribe, true))
.compose(applyScheduler(observe, false))
.subscribe(action, error ->
{
log.error("Exception in eventbus", error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public interface EventBusInterface

<T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action, int takeUntil);

<T extends Event> void subscribe(Class<T> eventClass, @NonNull Object lifecycle, @NonNull Consumer<T> action, int takeUntil, EventScheduler subscribe, EventScheduler observe);

void unregister(@NonNull Object lifecycle);

<T extends Event> void post(Class<? extends T> eventClass, @NonNull T event);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package net.runelite.client.eventbus;

import io.reactivex.annotations.Nullable;

public enum EventScheduler
{
DEFAULT(null),
COMPUTATION("computation"),
IO("io"),
NEWTHREAD("newThread"),
SINGLE("single"),
TRAMPOLINE("trampoline"),
CLIENT("client");

public final String scheduler;

EventScheduler(@Nullable String scheduler)
{
this.scheduler = scheduler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,9 @@
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface Subscribe {}
public @interface Subscribe
{
int takeUntil() default -1;
EventScheduler subscribe() default EventScheduler.DEFAULT;
EventScheduler observe() default EventScheduler.DEFAULT;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import lombok.Value;
import net.runelite.api.events.Event;
import net.runelite.client.eventbus.EventBus;
import net.runelite.client.eventbus.EventScheduler;
import net.runelite.client.eventbus.Subscribe;

public abstract class Plugin implements Module
Expand All @@ -62,7 +63,7 @@ protected void shutDown() throws Exception
@SuppressWarnings("unchecked")
final void addAnnotatedSubscriptions(EventBus eventBus)
{
annotatedSubscriptions.forEach(sub -> eventBus.subscribe(sub.type, annotatedSubsLock, sub.method));
annotatedSubscriptions.forEach(sub -> eventBus.subscribe(sub.type, annotatedSubsLock, sub.method, sub.takeUntil, sub.subscribe, sub.observe));
}

final void removeAnnotatedSubscriptions(EventBus eventBus)
Expand All @@ -76,8 +77,11 @@ private Set<Subscription> findSubscriptions()

for (Method method : this.getClass().getDeclaredMethods())
{
if (method.getAnnotation(Subscribe.class) == null)
Subscribe annotation = method.getAnnotation(Subscribe.class);
if (annotation == null)
{
continue;
}

assert method.getParameterCount() == 1 : "Methods annotated with @Subscribe should have only one parameter";

Expand All @@ -88,7 +92,7 @@ private Set<Subscription> findSubscriptions()

method.setAccessible(true);

Subscription sub = new Subscription(type.asSubclass(Event.class), event -> method.invoke(this, event));
Subscription sub = new Subscription(type.asSubclass(Event.class), event -> method.invoke(this, event), annotation.takeUntil(), annotation.subscribe(), annotation.observe());

builder.add(sub);
}
Expand All @@ -101,5 +105,8 @@ private static class Subscription
{
private final Class type;
private final Consumer method;
private final int takeUntil;
private final EventScheduler subscribe;
private final EventScheduler observe;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private void onSessionOpen(SessionOpen event)
applyWorld();
}

@Subscribe
@Subscribe(takeUntil = 2)
private void onGameStateChanged(GameStateChanged event)
{
if (event.getGameState() == GameState.LOGGED_IN)
Expand Down

0 comments on commit 675c305

Please sign in to comment.