Skip to content

Commit

Permalink
feat(#105): Add reactive Job interface
Browse files Browse the repository at this point in the history
  • Loading branch information
zero88 committed Jan 16, 2024
1 parent cc2d89e commit 9674c25
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 5 deletions.
5 changes: 0 additions & 5 deletions core/src/main/java/io/github/zero88/schedulerx/AsyncJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import org.jetbrains.annotations.NotNull;

import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Future;
import io.vertx.core.Handler;

Expand All @@ -15,7 +13,6 @@
*
* @since 2.0.0
*/
@VertxGen
public interface AsyncJob<INPUT, OUTPUT> extends Job<INPUT, OUTPUT> {

/**
Expand All @@ -32,7 +29,6 @@ public interface AsyncJob<INPUT, OUTPUT> extends Job<INPUT, OUTPUT> {
* @see JobData
* @see ExecutionContext
*/
@GenIgnore
@Override
default void execute(@NotNull JobData<INPUT> jobData, @NotNull ExecutionContext<OUTPUT> executionContext) {
asyncExecute(jobData, executionContext).onSuccess(executionContext::complete).onFailure(executionContext::fail);
Expand All @@ -52,7 +48,6 @@ default void execute(@NotNull JobData<INPUT> jobData, @NotNull ExecutionContext<
* @see JobData
* @see ExecutionContext
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
Future<OUTPUT> asyncExecute(@NotNull JobData<INPUT> jobData, @NotNull ExecutionContext<OUTPUT> executionContext);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.github.zero88.schedulerx;

import java.util.concurrent.CompletionStage;

import org.jetbrains.annotations.NotNull;

import io.vertx.core.Future;

/**
* Job interface for java concurrent {@link CompletionStage}.
*
* @since 2.0.0
*/
public interface CompletionStageJob<INPUT, OUTPUT>
extends FuturableJob<INPUT, OUTPUT, CompletionStage<OUTPUT>, ExecutionContext<OUTPUT>> {

@Override
default ExecutionContext<OUTPUT> transformContext(@NotNull ExecutionContext<OUTPUT> executionContext) {
return executionContext;
}

@Override
default Future<OUTPUT> transformResult(CompletionStage<OUTPUT> result) {
return Future.fromCompletionStage(result);
}

CompletionStage<OUTPUT> doAsync(@NotNull JobData<INPUT> jobData,
@NotNull ExecutionContext<OUTPUT> executionContext);

}
54 changes: 54 additions & 0 deletions core/src/main/java/io/github/zero88/schedulerx/FuturableJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package io.github.zero88.schedulerx;

import org.jetbrains.annotations.NotNull;

import io.vertx.core.Future;

/**
* An interface supports reactive version for {@code Job}.
* <p/>
* This interface bridges you to write a new {@code Job} that based on your flavor async coding style and the reactive
* library such as <a href="https://reactivex.io/">#reactivex</a> or
* <a href="https://smallrye.io/smallrye-mutiny">#mutiny</a>
*
* @param <INPUT> Type of job input
* @param <OUTPUT> Type of job output
* @param <R> Type of {@code Rxified} execution result
* @param <CTX> Type of {@code Rxified} execution context
* @since 2.0.0
*/
public interface FuturableJob<INPUT, OUTPUT, R, CTX> extends AsyncJob<INPUT, OUTPUT> {

@Override
default Future<OUTPUT> asyncExecute(@NotNull JobData<INPUT> jobData,
@NotNull ExecutionContext<OUTPUT> executionContext) {
return transformResult(doAsync(jobData, transformContext(executionContext)));
}

/**
* Transform execution context to {@code Rxified} execution context
*
* @param executionContext job execution context
* @return {@code Rxified} execution context
* @see ExecutionContext
*/
CTX transformContext(@NotNull ExecutionContext<OUTPUT> executionContext);

/**
* Transform the {@code Rxified} execution result to {@code Vert.x} {@link Future} version
*
* @param result {@code Rxified} execution result
* @return the execution result in {@link Future}
*/
Future<OUTPUT> transformResult(R result);

/**
* Async execute
*
* @param jobData job data
* @param executionContext job execution context
* @return the {@code Rxified} execution result
*/
R doAsync(@NotNull JobData<INPUT> jobData, @NotNull CTX executionContext);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.github.zero88.schedulerx.mutiny;

import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.FuturableJob;
import io.github.zero88.schedulerx.JobData;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.vertx.UniHelper;
import io.vertx.core.Future;

/**
* Job interface for {@link Uni} in <a href="https://smallrye.io/smallrye-mutiny">#mutiny</a> version.
*
* @since 2.0.0
*/
public interface MutinyJob<INPUT, OUTPUT> extends FuturableJob<INPUT, OUTPUT, Uni<OUTPUT>, ExecutionContext<OUTPUT>> {

@Override
default ExecutionContext<OUTPUT> transformContext(
@NotNull io.github.zero88.schedulerx.ExecutionContext<OUTPUT> executionContext) {
return ExecutionContext.newInstance(executionContext);
}

@Override
default Future<OUTPUT> transformResult(Uni<OUTPUT> result) { return UniHelper.toFuture(result); }

Uni<OUTPUT> asyncExecute(@NotNull JobData<INPUT> jobData,
@NotNull io.github.zero88.schedulerx.mutiny.ExecutionContext<OUTPUT> executionContext);

}
15 changes: 15 additions & 0 deletions core/src/main/java/io/github/zero88/schedulerx/rxjava3/Rx3Job.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.github.zero88.schedulerx.rxjava3;

import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.FuturableJob;

interface Rx3Job<INPUT, OUTPUT, T> extends FuturableJob<INPUT, OUTPUT, T, ExecutionContext<OUTPUT>> {

@Override
default ExecutionContext<OUTPUT> transformContext(
io.github.zero88.schedulerx.@NotNull ExecutionContext<OUTPUT> executionContext) {
return ExecutionContext.newInstance(executionContext);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.github.zero88.schedulerx.rxjava3;

import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.JobData;
import io.reactivex.rxjava3.core.Maybe;
import io.vertx.core.Future;
import io.vertx.rxjava3.MaybeHelper;

/**
* Job interface for {@link Maybe} in <a href="https://reactivex.io/">#reactivex</a> version.
*
* @since 2.0.0
*/
public interface Rx3MaybeJob<INPUT, OUTPUT> extends Rx3Job<INPUT, OUTPUT, Maybe<OUTPUT>> {

@Override
default Future<OUTPUT> transformResult(Maybe<OUTPUT> result) { return MaybeHelper.toFuture(result); }

Maybe<OUTPUT> asyncExecute(@NotNull JobData<INPUT> jobData,
@NotNull io.github.zero88.schedulerx.rxjava3.ExecutionContext<OUTPUT> executionContext);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.github.zero88.schedulerx.rxjava3;

import org.jetbrains.annotations.NotNull;

import io.github.zero88.schedulerx.JobData;
import io.reactivex.rxjava3.core.Single;
import io.vertx.core.Future;
import io.vertx.rxjava3.SingleHelper;

/**
* Job interface for {@link Single} in <a href="https://reactivex.io/">#reactivex</a> version.
*
* @since 2.0.0
*/
public interface Rx3SingleJob<INPUT, OUTPUT> extends Rx3Job<INPUT, OUTPUT, Single<OUTPUT>> {

@Override
default Future<OUTPUT> transformResult(Single<OUTPUT> result) { return SingleHelper.toFuture(result); }

Single<OUTPUT> asyncExecute(@NotNull JobData<INPUT> jobData,
@NotNull io.github.zero88.schedulerx.rxjava3.ExecutionContext<OUTPUT> executionContext);

}

0 comments on commit 9674c25

Please sign in to comment.