Skip to content

Commit

Permalink
Merge pull request #73 from zero88/feature/add-trigger-condition-in-r…
Browse files Browse the repository at this point in the history
…esult

feat(#69): Add TriggerCondition
  • Loading branch information
zero88 authored Aug 19, 2023
2 parents a288183 + f56b4eb commit bc2d58d
Show file tree
Hide file tree
Showing 18 changed files with 526 additions and 179 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.github.zero88.schedulerx;

import java.time.Instant;
import java.util.Objects;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -31,12 +32,22 @@ public interface ExecutionContext<OUT> {
*/
long round();

/**
* Runtime trigger context
*
* @see TriggerContext
* @since 2.0.0
*/
@NotNull TriggerContext triggerContext();

/**
* Trigger execution at time
*
* @return triggeredAt
*/
@NotNull Instant triggeredAt();
default @NotNull Instant triggeredAt() {
return Objects.requireNonNull(triggerContext().triggerAt());
}

/**
* Executed at time
Expand All @@ -45,14 +56,6 @@ public interface ExecutionContext<OUT> {
*/
@NotNull Instant executedAt();

/**
* Runtime trigger context
*
* @see TriggerContext
* @since 2.0.0
*/
@NotNull TriggerContext triggerContext();

/**
* Check whether force stop execution or not
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.trigger.TriggerContext;

/**
* Represents for task result will be pass on each event of {@link SchedulingMonitor}
*
Expand All @@ -25,6 +27,8 @@ public interface ExecutionResult<OUTPUT> {
*/
@Nullable <T> T externalId();

TriggerContext triggerContext();

/**
* Only {@code not null} in {@link SchedulingMonitor#onUnableSchedule(ExecutionResult)}
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package io.github.zero88.schedulerx.impl;

import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.jetbrains.annotations.ApiStatus.Internal;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.ExecutionContext;
import io.github.zero88.schedulerx.ExecutionResult;
import io.github.zero88.schedulerx.JobData;
import io.github.zero88.schedulerx.Scheduler;
import io.github.zero88.schedulerx.Task;
import io.github.zero88.schedulerx.ExecutionContext;
import io.github.zero88.schedulerx.TaskExecutor;
import io.github.zero88.schedulerx.SchedulingMonitor;
import io.github.zero88.schedulerx.ExecutionResult;
import io.github.zero88.schedulerx.Task;
import io.github.zero88.schedulerx.trigger.Trigger;
import io.github.zero88.schedulerx.trigger.TriggerCondition.ReasonCode;
import io.github.zero88.schedulerx.trigger.TriggerCondition.TriggerStatus;
import io.github.zero88.schedulerx.trigger.TriggerContext;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
Expand All @@ -36,7 +38,7 @@
public abstract class AbstractScheduler<IN, OUT, T extends Trigger> implements Scheduler<IN, OUT, T> {

@SuppressWarnings("java:S3416")
protected static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutor.class);
protected static final Logger LOGGER = LoggerFactory.getLogger(Scheduler.class);

private final @NotNull Vertx vertx;
private final @NotNull SchedulerStateInternal<OUT> state;
Expand Down Expand Up @@ -129,7 +131,7 @@ public final void executeTask(@NotNull ExecutionContext<OUT> executionContext) {
public final void cancel() {
if (!state.completed()) {
trace(Instant.now(), "Canceling the task");
doStop(state.timerId());
doStop(state.timerId(), TriggerContextFactory.stop(trigger().type(), ReasonCode.STOP_BY_MANUAL));
}
}

Expand All @@ -139,44 +141,63 @@ protected final void doStart(WorkerExecutor workerExecutor) {
.onFailure(this::onUnableSchedule);
}

protected final void doStop(long timerId) {
protected final void doStop(long timerId, TriggerContext context) {
unregisterTimer(timerId);
onCompleted();
onCompleted(context);
}

protected abstract @NotNull Future<Long> registerTimer(@NotNull Promise<Long> promise,
@Nullable WorkerExecutor workerExecutor);

protected void unregisterTimer(long timerId) { vertx.cancelTimer(timerId); }

protected InternalTriggerContext shouldRun(@NotNull Instant triggerAt, @NotNull TriggerContext triggerContext) {
state.increaseTick();
protected final TriggerContext shouldRun(@NotNull TriggerContext triggerContext) {
if (triggerContext.condition().status() != TriggerStatus.INITIALIZED) {
return triggerContext;
}
if (state.pending()) {
return TriggerContextFactory.skip(triggerContext, ReasonCode.NOT_YET_SCHEDULED);
}
if (state.completed()) {
trace(triggerAt, "The task execution is already completed");
return TriggerContextFactory.skip(triggerContext, ReasonCode.ALREADY_STOPPED);
}
if (state.executing()) {
onMisfire(triggerAt, "The task is still running");
return TriggerContextFactory.skip(triggerContext, ReasonCode.TASK_IS_RUNNING);
}
return InternalTriggerContext.create(state.idle() && trigger().shouldExecute(triggerAt), triggerContext);
return evaluateTrigger(triggerContext);
}

protected final boolean shouldStop(@Nullable ExecutionContext<OUT> executionContext, long round) {
return (executionContext != null && executionContext.isForceStop()) || trigger().shouldStop(round);
protected final TriggerContext shouldStop(@NotNull TriggerContext triggerContext, boolean isForceStop, long round) {
if (isForceStop) {
return TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_TASK);
}
return trigger().shouldStop(round)
? TriggerContextFactory.stop(triggerContext, ReasonCode.STOP_BY_CONFIG)
: triggerContext;
}

protected TriggerContext evaluateTrigger(@NotNull TriggerContext triggerContext) {
return trigger().shouldExecute(Objects.requireNonNull(triggerContext.triggerAt()))
? TriggerContextFactory.ready(triggerContext)
: TriggerContextFactory.skip(triggerContext, ReasonCode.CONDITION_IS_NOT_MATCHED);
}

protected final void run(WorkerExecutor workerExecutor, TriggerContext triggerContext) {
final Instant triggerAt = Instant.now();
final InternalTriggerContext internalContext = shouldRun(triggerAt, triggerContext);
if (internalContext.shouldRun()) {
final TriggerContext triggerCtx = TriggerContext.create(internalContext.type(), internalContext.info());
state.increaseTick();
final TriggerContext transitionCtx = shouldRun(triggerContext);
if (transitionCtx.condition().isReady()) {
final ExecutionContextInternal<OUT> ctx = new ExecutionContextImpl<>(vertx, state.increaseRound(),
triggerAt, triggerCtx);
trace(triggerAt, "Trigger the task execution");
transitionCtx);
trace(Objects.requireNonNull(transitionCtx.triggerAt()), "Trigger the task execution");
if (workerExecutor != null) {
workerExecutor.executeBlocking(promise -> executeTask(onExecute(promise, ctx)), this::onResult);
workerExecutor.executeBlocking(promise -> executeTask(onExecute(promise, ctx)),
asyncResult -> onResult(transitionCtx, asyncResult));
} else {
vertx.executeBlocking(promise -> executeTask(onExecute(promise, ctx)), this::onResult);
vertx.executeBlocking(promise -> executeTask(onExecute(promise, ctx)),
asyncResult -> onResult(transitionCtx, asyncResult));
}
} else {
onMisfire(transitionCtx);
}
}

Expand Down Expand Up @@ -206,54 +227,61 @@ protected final void onSchedule(long timerId) {
}

protected final void onUnableSchedule(Throwable t) {
final TriggerContext triggerContext = TriggerContextFactory.failed(trigger.type(),
ReasonCode.FAILED_TO_SCHEDULE, t);
monitor.onUnableSchedule(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setTick(state.tick())
.setRound(state.round())
.setUnscheduledAt(Instant.now())
.setError(t)
.setTriggerContext(triggerContext)
.build());
}

protected final void onMisfire(@NotNull Instant triggerAt, String reason) {
trace(triggerAt, "Skip the execution::" + reason);
protected final void onMisfire(@NotNull TriggerContext triggerContext) {
trace(Objects.requireNonNull(triggerContext.triggerAt()),
"Skip the execution::" + triggerContext.condition().reasonCode());
monitor.onMisfire(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setTick(state.tick())
.setAvailableAt(state.availableAt())
.setTriggeredAt(triggerAt)
.setTriggeredAt(triggerContext.triggerAt())
.setTriggerContext(triggerContext)
.build());
}

@SuppressWarnings("unchecked")
protected final void onResult(@NotNull AsyncResult<Object> asyncResult) {
protected final void onResult(@NotNull TriggerContext triggerContext, @NotNull AsyncResult<Object> asyncResult) {
state.markIdle();
final Instant finishedAt = Instant.now();
if (asyncResult.failed()) {
LOGGER.warn(genMsg(state.tick(), state.round(), finishedAt, "Programming error"), asyncResult.cause());
}
ExecutionContextInternal<OUT> executionContext = (ExecutionContextInternal<OUT>) asyncResult.result();
TriggerContext transitionCtx;
if (asyncResult.succeeded()) {
trace(finishedAt, "Received the task result");
final ExecutionContextInternal<OUT> executionCtx = (ExecutionContextInternal<OUT>) asyncResult.result();
monitor.onEach(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setAvailableAt(state.availableAt())
.setTick(state.tick())
.setRound(executionContext.round())
.setTriggeredAt(executionContext.triggeredAt())
.setExecutedAt(executionContext.executedAt())
.setRound(executionCtx.round())
.setTriggerContext(triggerContext)
.setTriggeredAt(executionCtx.triggeredAt())
.setExecutedAt(executionCtx.executedAt())
.setFinishedAt(finishedAt)
.setData(state.addData(executionContext.round(), executionContext.data()))
.setError(state.addError(executionContext.round(), executionContext.error()))
.setData(state.addData(executionCtx.round(), executionCtx.data()))
.setError(state.addError(executionCtx.round(), executionCtx.error()))
.setCompleted(state.completed())
.build());
transitionCtx = shouldStop(triggerContext, executionCtx.isForceStop(), state.round());
} else {
LOGGER.warn(genMsg(state.tick(), state.round(), finishedAt, "Programming error"), asyncResult.cause());
transitionCtx = shouldStop(triggerContext, false, state.round());
}
if (shouldStop(executionContext, state.round())) {
doStop(state.timerId());
if (transitionCtx.condition().isStop()) {
doStop(state.timerId(), transitionCtx);
}
}

protected final void onCompleted() {
protected final void onCompleted(TriggerContext context) {
state.markCompleted();
final Instant completedAt = Instant.now();
trace(completedAt, "The task execution is completed");
Expand All @@ -262,6 +290,7 @@ protected final void onCompleted() {
.setAvailableAt(state.availableAt())
.setTick(state.tick())
.setRound(state.round())
.setTriggerContext(context)
.setCompleted(state.completed())
.setCompletedAt(completedAt)
.setData(state.lastData())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,16 @@ final class ExecutionContextImpl<OUTPUT> implements ExecutionContextInternal<OUT

private final Vertx vertx;
private final long round;
private final Instant triggeredAt;
private final TriggerContext triggerContext;
private Instant executedAt;
private Promise<Object> promise;
private OUTPUT data;
private Throwable error;
private boolean forceStop = false;

public ExecutionContextImpl(Vertx vertx, long round, Instant triggeredAt, TriggerContext triggerContext) {
ExecutionContextImpl(Vertx vertx, long round, TriggerContext triggerContext) {
this.vertx = vertx;
this.round = round;
this.triggeredAt = triggeredAt;
this.triggerContext = triggerContext;
}

Expand All @@ -41,12 +39,10 @@ public ExecutionContextImpl(Vertx vertx, long round, Instant triggeredAt, Trigge

public @NotNull Vertx vertx() { return this.vertx; }

public @NotNull Instant triggeredAt() { return this.triggeredAt; }
public @NotNull TriggerContext triggerContext() { return this.triggerContext; }

public @NotNull Instant executedAt() { return this.executedAt; }

public @NotNull TriggerContext triggerContext() { return this.triggerContext; }

public long round() { return this.round; }

public OUTPUT data() { return this.data; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.jetbrains.annotations.Nullable;

import io.github.zero88.schedulerx.ExecutionResult;
import io.github.zero88.schedulerx.trigger.TriggerContext;

final class ExecutionResultImpl<OUTPUT> implements ExecutionResult<OUTPUT> {

Expand All @@ -21,29 +22,34 @@ final class ExecutionResultImpl<OUTPUT> implements ExecutionResult<OUTPUT> {
private final long tick;
private final long round;
private final boolean completed;
private final TriggerContext triggerContext;
private final Throwable error;
private final OUTPUT data;

ExecutionResultImpl(ExecutionResultBuilder<OUTPUT> builder) {
this.unscheduledAt = builder.unscheduledAt;
this.rescheduledAt = builder.rescheduledAt;
this.availableAt = builder.availableAt;
this.triggeredAt = builder.triggeredAt;
this.executedAt = builder.executedAt;
this.finishedAt = builder.finishedAt;
this.completedAt = builder.completedAt;
this.externalId = builder.externalId;
this.tick = builder.tick;
this.round = builder.round;
this.completed = builder.completed;
this.error = builder.error;
this.data = builder.data;
this.unscheduledAt = builder.unscheduledAt;
this.rescheduledAt = builder.rescheduledAt;
this.availableAt = builder.availableAt;
this.triggeredAt = builder.triggeredAt;
this.executedAt = builder.executedAt;
this.finishedAt = builder.finishedAt;
this.completedAt = builder.completedAt;
this.externalId = builder.externalId;
this.tick = builder.tick;
this.round = builder.round;
this.completed = builder.completed;
this.triggerContext = builder.triggerContext;
this.error = builder.error;
this.data = builder.data;
}

@Override
@SuppressWarnings("unchecked")
public <T> @Nullable T externalId() { return (T) this.externalId; }

@Override
public TriggerContext triggerContext() { return this.triggerContext; }

public Instant unscheduledAt() { return this.unscheduledAt; }

public Instant rescheduledAt() { return this.rescheduledAt; }
Expand Down Expand Up @@ -81,6 +87,7 @@ static final class ExecutionResultBuilder<OUTPUT> {
long tick;
long round;
boolean completed;
TriggerContext triggerContext;
Throwable error;
OUTPUT data;

Expand Down Expand Up @@ -139,6 +146,11 @@ public ExecutionResultBuilder<OUTPUT> setCompleted(boolean completed) {
return this;
}

public ExecutionResultBuilder<OUTPUT> setTriggerContext(TriggerContext triggerContext) {
this.triggerContext = triggerContext;
return this;
}

public ExecutionResultBuilder<OUTPUT> setError(Throwable error) {
this.error = error;
return this;
Expand Down
Loading

0 comments on commit bc2d58d

Please sign in to comment.