-
Notifications
You must be signed in to change notification settings - Fork 160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Why RxJava was chosen instead of e.g. Reactor? #255
Comments
I'm also kinda curious about this. I would guess because of Android compatibility. Personally I would also love a reactor API. |
Sorry for the delayed response. RxJava was chosen because of Android compatibility and expressive types like As RxJava and Reactor are both reactive-streams compliant, it is really easy to write an adapter. Example for an interface: public interface Mqtt5ReactorClient extends Mqtt5Client {
static @NotNull Mqtt5ReactorClient from(final @NotNull Mqtt5Client client) {
return new MqttReactorClient(client.toRx());
}
default @NotNull Mono<Mqtt5ConnAck> connect() {
return connect(MqttConnect.DEFAULT);
}
@NotNull Mono<Mqtt5ConnAck> connect(@NotNull Mqtt5Connect connect);
default @NotNull Mqtt5ConnectBuilder.Nested<Mono<Mqtt5ConnAck>> connectWith() {
return new MqttConnectBuilder.Nested<>(this::connect);
}
@NotNull Mono<Mqtt5SubAck> subscribe(@NotNull Mqtt5Subscribe subscribe);
default @NotNull Mqtt5SubscribeBuilder.Nested.Start<Mono<Mqtt5SubAck>> subscribeWith() {
return new MqttSubscribeBuilder.Nested<>(this::subscribe);
}
@NotNull Flux<Mqtt5Publish> subscribeStream(@NotNull Mqtt5Subscribe subscribe);
default @NotNull Mqtt5SubscribeBuilder.Nested.Start<Flux<Mqtt5Publish>> subscribeStreamWith() {
return new MqttSubscribeBuilder.Nested<>(this::subscribeStream);
}
@NotNull Flux<Mqtt5Publish> publishes(@NotNull MqttGlobalPublishFilter filter);
@NotNull Mono<Mqtt5UnsubAck> unsubscribe(@NotNull Mqtt5Unsubscribe unsubscribe);
default @NotNull Mqtt5UnsubscribeBuilder.Nested.Start<Mono<Mqtt5UnsubAck>> unsubscribeWith() {
return new MqttUnsubscribeBuilder.Nested<>(this::unsubscribe);
}
@NotNull Flux<Mqtt5PublishResult> publish(@NotNull Publisher<Mqtt5Publish> publishFlowable);
@NotNull Mono<Void> reauth();
default @NotNull Mono<Void> disconnect() {
return disconnect(MqttDisconnect.DEFAULT);
}
@NotNull Mono<Void> disconnect(@NotNull Mqtt5Disconnect disconnect);
default @NotNull Mqtt5DisconnectBuilder.Nested<Mono<Void>> disconnectWith() {
return new MqttDisconnectBuilder.Nested<>(this::disconnect);
}
} Example for an implementation: public class MqttReactorClient implements Mqtt5ReactorClient {
private final @NotNull Mqtt5RxClient delegate;
public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) {
this.delegate = delegate;
}
public @NotNull Mono<Mqtt5ConnAck> connect(final @NotNull Mqtt5Connect connect) {
return Mono.fromDirect(delegate.connect(connect).toFlowable());
}
public @NotNull Mono<Mqtt5SubAck> subscribe(final @NotNull Mqtt5Subscribe subscribe) {
return Mono.fromDirect(delegate.subscribe(subscribe).toFlowable());
}
public @NotNull Flux<Mqtt5Publish> subscribeStream(final @NotNull Mqtt5Subscribe subscribe) {
return Flux.from(delegate.subscribeStream(subscribe));
}
public @NotNull Flux<Mqtt5Publish> publishes(final @NotNull MqttGlobalPublishFilter filter) {
return Flux.from(delegate.publishes(filter));
}
public @NotNull Mono<Mqtt5UnsubAck> unsubscribe(final @NotNull Mqtt5Unsubscribe unsubscribe) {
return Mono.fromDirect(delegate.unsubscribe(unsubscribe).toFlowable());
}
public @NotNull Flux<Mqtt5PublishResult> publish(final @NotNull Publisher<Mqtt5Publish> publishFlowable) {
return Flux.from(delegate.publish(Flowable.fromPublisher(publishFlowable)));
}
public @NotNull Mono<Void> reauth() {
return Mono.fromDirect(delegate.reauth().toFlowable());
}
public @NotNull Mono<Void> disconnect(final @NotNull Mqtt5Disconnect disconnect) {
return Mono.fromDirect(delegate.disconnect(disconnect).toFlowable());
}
@Override
public @NotNull Mqtt5ClientConfig getConfig() {
return delegate.getConfig();
}
@Override
public @NotNull Mqtt5RxClient toRx() {
return delegate;
}
@Override
public @NotNull Mqtt5AsyncClient toAsync() {
return delegate.toAsync();
}
@Override
public @NotNull Mqtt5BlockingClient toBlocking() {
return delegate.toBlocking();
}
} We could directly integrate this into the library if the dependency on reactor can be made optional, so nobody is forced to have rxjava and reactor. |
Thanks for the answer, I think it would be quite nice if one could choose between RxJava and Reactor. |
I'm just curious and I would like to know why you have chosen RxJava to implement the reactive API in your nice little library, and you didn't choose for example Reactor?
This is not to critisize RxJava but I would just like to know if there were some specific resasons.
Thanks in advance!
The text was updated successfully, but these errors were encountered: