Skip to content

Commit

Permalink
feat(#69): Add TriggerCondition
Browse files Browse the repository at this point in the history
  • Loading branch information
zero88 committed Aug 19, 2023
1 parent a288183 commit 088e705
Show file tree
Hide file tree
Showing 17 changed files with 395 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,22 @@ public interface ExecutionContext<OUT> {
*/
long round();

/**
* Runtime trigger context
*
* @see TriggerContext
* @since 2.0.0
*/
@NotNull TriggerContext triggerContext();

/**
* Trigger execution at time
*
* @return triggeredAt
*/
@NotNull Instant triggeredAt();
default @NotNull Instant triggeredAt() {
return triggerContext().triggerAt();
}

/**
* Executed at time
Expand All @@ -45,14 +55,6 @@ public interface ExecutionContext<OUT> {
*/
@NotNull Instant executedAt();

/**
* Runtime trigger context
*
* @see TriggerContext
* @since 2.0.0
*/
@NotNull TriggerContext triggerContext();

/**
* Check whether force stop execution or not
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.trigger.TriggerContext;

/**
* Represents for task result will be pass on each event of {@link SchedulingMonitor}
*
Expand All @@ -25,6 +27,8 @@ public interface ExecutionResult<OUTPUT> {
*/
@Nullable <T> T externalId();

TriggerContext triggerContext();

/**
* Only {@code not null} in {@link SchedulingMonitor#onUnableSchedule(ExecutionResult)}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.ExecutionContext;
import io.github.zero88.schedulerx.ExecutionResult;
import io.github.zero88.schedulerx.JobData;
import io.github.zero88.schedulerx.Scheduler;
import io.github.zero88.schedulerx.Task;
import io.github.zero88.schedulerx.ExecutionContext;
import io.github.zero88.schedulerx.TaskExecutor;
import io.github.zero88.schedulerx.SchedulingMonitor;
import io.github.zero88.schedulerx.ExecutionResult;
import io.github.zero88.schedulerx.Task;
import io.github.zero88.schedulerx.trigger.Trigger;
import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode;
import io.github.zero88.schedulerx.trigger.TriggerCondition.TriggerStatus;
import io.github.zero88.schedulerx.trigger.TriggerContext;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
Expand All @@ -36,7 +37,7 @@
public abstract class AbstractScheduler<IN, OUT, T extends Trigger> implements Scheduler<IN, OUT, T> {

@SuppressWarnings("java:S3416")
protected static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutor.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(Scheduler.class);

private final @NotNull Vertx vertx;
private final @NotNull SchedulerStateInternal<OUT> state;
Expand Down Expand Up @@ -129,7 +130,7 @@ public final void executeTask(@NotNull ExecutionContext<OUT> executionContext) {
public final void cancel() {
if (!state.completed()) {
trace(Instant.now(), "Canceling the task");
doStop(state.timerId());
doStop(state.timerId(), TriggerContextFactory.stop(trigger().type(), ReasonCode.STOP_BY_MANUAL));
}
}

Expand All @@ -139,44 +140,63 @@ protected final void doStart(WorkerExecutor workerExecutor) {
.onFailure(this::onUnableSchedule);
}

protected final void doStop(long timerId) {
protected final void doStop(long timerId, TriggerContext context) {
unregisterTimer(timerId);
onCompleted();
onCompleted(context);
}

protected abstract @NotNull Future<Long> registerTimer(@NotNull Promise<Long> promise,
@Nullable WorkerExecutor workerExecutor);

protected void unregisterTimer(long timerId) { vertx.cancelTimer(timerId); }

protected InternalTriggerContext shouldRun(@NotNull Instant triggerAt, @NotNull TriggerContext triggerContext) {
state.increaseTick();
protected final TriggerContext shouldRun(@NotNull TriggerContext triggerContext) {
if (triggerContext.condition().status() != TriggerStatus.INITIALIZED) {
return triggerContext;
}
if (state.pending()) {
return TriggerContextFactory.skip(triggerContext, ReasonCode.TRIGGER_IS_NOT_YET_SCHEDULED);
}
if (state.completed()) {
trace(triggerAt, "The task execution is already completed");
return TriggerContextFactory.skip(triggerContext, ReasonCode.TRIGGER_IS_ALREADY_STOPPED);
}
if (state.executing()) {
onMisfire(triggerAt, "The task is still running");
return TriggerContextFactory.skip(triggerContext, ReasonCode.TASK_IS_RUNNING);
}
return InternalTriggerContext.create(state.idle() && trigger().shouldExecute(triggerAt), triggerContext);
return evaluateTrigger(triggerContext);
}

protected final boolean shouldStop(@Nullable ExecutionContext<OUT> executionContext, long round) {
return (executionContext != null && executionContext.isForceStop()) || trigger().shouldStop(round);
protected final TriggerContext shouldStop(@NotNull TriggerContext triggerContext, boolean isForceStop, long round) {
if (isForceStop) {
return TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_TASK);
}
return trigger().shouldStop(round)
? TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_CONFIG)
: triggerContext;
}

protected TriggerContext evaluateTrigger(@NotNull TriggerContext triggerContext) {
return trigger().shouldExecute(triggerContext.triggerAt())
? TriggerContextFactory.ready(triggerContext)
: TriggerContextFactory.skip(triggerContext, ReasonCode.CONDITION_IS_NOT_MATCHED);
}

protected final void run(WorkerExecutor workerExecutor, TriggerContext triggerContext) {
final Instant triggerAt = Instant.now();
final InternalTriggerContext internalContext = shouldRun(triggerAt, triggerContext);
if (internalContext.shouldRun()) {
final TriggerContext triggerCtx = TriggerContext.create(internalContext.type(), internalContext.info());
state.increaseTick();
final TriggerContext transitionCtx = shouldRun(triggerContext);
if (transitionCtx.condition().isReady()) {
final ExecutionContextInternal<OUT> ctx = new ExecutionContextImpl<>(vertx, state.increaseRound(),
triggerAt, triggerCtx);
trace(triggerAt, "Trigger the task execution");
transitionCtx);
trace(transitionCtx.triggerAt(), "Trigger the task execution");
if (workerExecutor != null) {
workerExecutor.executeBlocking(promise -> executeTask(onExecute(promise, ctx)), this::onResult);
workerExecutor.executeBlocking(promise -> executeTask(onExecute(promise, ctx)),
asyncResult -> onResult(transitionCtx, asyncResult));
} else {
vertx.executeBlocking(promise -> executeTask(onExecute(promise, ctx)), this::onResult);
vertx.executeBlocking(promise -> executeTask(onExecute(promise, ctx)),
asyncResult -> onResult(transitionCtx, asyncResult));
}
} else {
onMisfire(transitionCtx);
}
}

Expand Down Expand Up @@ -215,45 +235,48 @@ protected final void onUnableSchedule(Throwable t) {
.build());
}

protected final void onMisfire(@NotNull Instant triggerAt, String reason) {
trace(triggerAt, "Skip the execution::" + reason);
protected final void onMisfire(@NotNull TriggerContext triggerContext) {
trace(triggerContext.triggerAt(), "Skip the execution::" + triggerContext.condition().reasonCode());
monitor.onMisfire(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setTick(state.tick())
.setAvailableAt(state.availableAt())
.setTriggeredAt(triggerAt)
.setTriggeredAt(triggerContext.triggerAt())
.setTriggerContext(triggerContext)
.build());
}

@SuppressWarnings("unchecked")
protected final void onResult(@NotNull AsyncResult<Object> asyncResult) {
protected final void onResult(@NotNull TriggerContext triggerContext, @NotNull AsyncResult<Object> asyncResult) {
state.markIdle();
final Instant finishedAt = Instant.now();
if (asyncResult.failed()) {
LOGGER.warn(genMsg(state.tick(), state.round(), finishedAt, "Programming error"), asyncResult.cause());
}
ExecutionContextInternal<OUT> executionContext = (ExecutionContextInternal<OUT>) asyncResult.result();
TriggerContext transitionCtx;
if (asyncResult.succeeded()) {
trace(finishedAt, "Received the task result");
final ExecutionContextInternal<OUT> executionCtx = (ExecutionContextInternal<OUT>) asyncResult.result();
monitor.onEach(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setAvailableAt(state.availableAt())
.setTick(state.tick())
.setRound(executionContext.round())
.setTriggeredAt(executionContext.triggeredAt())
.setExecutedAt(executionContext.executedAt())
.setRound(executionCtx.round())
.setTriggeredAt(executionCtx.triggeredAt())
.setExecutedAt(executionCtx.executedAt())
.setFinishedAt(finishedAt)
.setData(state.addData(executionContext.round(), executionContext.data()))
.setError(state.addError(executionContext.round(), executionContext.error()))
.setData(state.addData(executionCtx.round(), executionCtx.data()))
.setError(state.addError(executionCtx.round(), executionCtx.error()))
.setCompleted(state.completed())
.build());
transitionCtx = shouldStop(triggerContext, executionCtx.isForceStop(), state.round());
} else {
LOGGER.warn(genMsg(state.tick(), state.round(), finishedAt, "Programming error"), asyncResult.cause());
transitionCtx = shouldStop(triggerContext, false, state.round());
}
if (shouldStop(executionContext, state.round())) {
doStop(state.timerId());
if (transitionCtx.condition().isStop()) {
doStop(state.timerId(), transitionCtx);
}
}

protected final void onCompleted() {
protected final void onCompleted(TriggerContext context) {
state.markCompleted();
final Instant completedAt = Instant.now();
trace(completedAt, "The task execution is completed");
Expand All @@ -266,6 +289,7 @@ protected final void onCompleted() {
.setCompletedAt(completedAt)
.setData(state.lastData())
.setError(state.lastError())
.setTriggerContext(context)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@ final class ExecutionContextImpl<OUTPUT> implements ExecutionContextInternal<OUT

private final Vertx vertx;
private final long round;
private final Instant triggeredAt;
private final TriggerContext triggerContext;
private Instant executedAt;
private Promise<Object> promise;
private OUTPUT data;
private Throwable error;
private boolean forceStop = false;

public ExecutionContextImpl(Vertx vertx, long round, Instant triggeredAt, TriggerContext triggerContext) {
ExecutionContextImpl(Vertx vertx, long round, TriggerContext triggerContext) {
this.vertx = vertx;
this.round = round;
this.triggeredAt = triggeredAt;
this.triggerContext = triggerContext;
}

Expand All @@ -41,12 +39,10 @@ public ExecutionContextImpl(Vertx vertx, long round, Instant triggeredAt, Trigge

public @NotNull Vertx vertx() { return this.vertx; }

public @NotNull Instant triggeredAt() { return this.triggeredAt; }
public @NotNull TriggerContext triggerContext() { return this.triggerContext; }

public @NotNull Instant executedAt() { return this.executedAt; }

public @NotNull TriggerContext triggerContext() { return this.triggerContext; }

public long round() { return this.round; }

public OUTPUT data() { return this.data; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.ExecutionResult;
import io.github.zero88.schedulerx.trigger.TriggerContext;

final class ExecutionResultImpl<OUTPUT> implements ExecutionResult<OUTPUT> {

Expand All @@ -21,29 +22,34 @@ final class ExecutionResultImpl<OUTPUT> implements ExecutionResult<OUTPUT> {
private final long tick;
private final long round;
private final boolean completed;
private final TriggerContext triggerContext;
private final Throwable error;
private final OUTPUT data;

ExecutionResultImpl(ExecutionResultBuilder<OUTPUT> builder) {
this.unscheduledAt = builder.unscheduledAt;
this.rescheduledAt = builder.rescheduledAt;
this.availableAt = builder.availableAt;
this.triggeredAt = builder.triggeredAt;
this.executedAt = builder.executedAt;
this.finishedAt = builder.finishedAt;
this.completedAt = builder.completedAt;
this.externalId = builder.externalId;
this.tick = builder.tick;
this.round = builder.round;
this.completed = builder.completed;
this.error = builder.error;
this.data = builder.data;
this.unscheduledAt = builder.unscheduledAt;
this.rescheduledAt = builder.rescheduledAt;
this.availableAt = builder.availableAt;
this.triggeredAt = builder.triggeredAt;
this.executedAt = builder.executedAt;
this.finishedAt = builder.finishedAt;
this.completedAt = builder.completedAt;
this.externalId = builder.externalId;
this.tick = builder.tick;
this.round = builder.round;
this.completed = builder.completed;
this.triggerContext = builder.triggerContext;
this.error = builder.error;
this.data = builder.data;
}

@Override
@SuppressWarnings("unchecked")
public <T> @Nullable T externalId() { return (T) this.externalId; }

@Override
public TriggerContext triggerContext() { return this.triggerContext; }

public Instant unscheduledAt() { return this.unscheduledAt; }

public Instant rescheduledAt() { return this.rescheduledAt; }
Expand Down Expand Up @@ -81,6 +87,7 @@ static final class ExecutionResultBuilder<OUTPUT> {
long tick;
long round;
boolean completed;
TriggerContext triggerContext;
Throwable error;
OUTPUT data;

Expand Down Expand Up @@ -139,6 +146,11 @@ public ExecutionResultBuilder<OUTPUT> setCompleted(boolean completed) {
return this;
}

public ExecutionResultBuilder<OUTPUT> setTriggerContext(TriggerContext triggerContext) {
this.triggerContext = triggerContext;
return this;
}

public ExecutionResultBuilder<OUTPUT> setError(Throwable error) {
this.error = error;
return this;
Expand Down

This file was deleted.

Loading

0 comments on commit 088e705

Please sign in to comment.