Skip to content

Commit

Permalink
feat(scheduler): evaluate trigger in blocking thread #80
Browse files Browse the repository at this point in the history
  • Loading branch information
zero88 committed Oct 27, 2023
1 parent f855bc9 commit 368b572
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 137 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.github.zero88.schedulerx;

import java.time.Instant;
import java.util.Objects;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -62,20 +61,20 @@ public interface ExecutionContext<OUT> {
boolean isForceStop();

/**
* Mark a flag stop/cancel to cancel executor
* Mark a force stop flag to unregister the trigger out of the system timer
*/
void forceStopExecution();

/**
* Completed execution with result data per each round
* Notify finish an execution per each round with its result data
*
* @param data object data
* @apiNote if task is {@code async} then it should be invoked in handling async result stage
*/
void complete(@Nullable OUT data);

/**
* Failed execution with error per each round
* Notify finish an execution per each round with its error data
*
* @param throwable execution error
* @apiNote if task is {@code async} then it should be invoked in handling async result stage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import org.jetbrains.annotations.ApiStatus.Internal;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -117,7 +118,7 @@ public final void start(WorkerExecutor workerExecutor) {
@Override
public final void executeTask(@NotNull ExecutionContext<OUT> executionContext) {
try {
log(executionContext.executedAt(), "Start to execute the task");
log(executionContext.executedAt(), "On execute");
task.execute(jobData(), executionContext);
if (!task.isAsync()) {
((ExecutionContextInternal<OUT>) executionContext).internalComplete();
Expand All @@ -136,25 +137,22 @@ public final void cancel() {
}

protected final void doStart(WorkerExecutor workerExecutor) {
this.registerTimer(Promise.promise(), workerExecutor)
.onSuccess(this::onSchedule)
.onFailure(this::onUnableSchedule);
this.registerTimer(workerExecutor).onSuccess(this::onSchedule).onFailure(this::onUnableSchedule);
}

protected final void doStop(long timerId, TriggerContext context) {
unregisterTimer(timerId);
onCompleted(context);
onComplete(context);
}

protected abstract @NotNull Future<Long> registerTimer(@NotNull Promise<Long> promise,
@Nullable WorkerExecutor workerExecutor);
protected abstract @NotNull Future<Long> registerTimer(@Nullable WorkerExecutor workerExecutor);

protected abstract void unregisterTimer(long timerId);

protected final TriggerTransitionContext shouldRun(@NotNull TriggerTransitionContext triggerContext) {
log(Instant.now(), "Evaluating the trigger condition...");
if (triggerContext.condition().status() != TriggerStatus.KICKOFF) {
return triggerContext;
log(Instant.now(), "On evaluate");
if (!triggerContext.isKickoff()) {
throw new IllegalStateException("Trigger condition status must be " + TriggerStatus.KICKOFF);
}
if (state.pending()) {
return TriggerContextFactory.skip(triggerContext, ReasonCode.NOT_YET_SCHEDULED);
Expand Down Expand Up @@ -188,47 +186,51 @@ protected TriggerTransitionContext evaluateTrigger(@NotNull TriggerTransitionCon
: TriggerContextFactory.skip(triggerContext, ReasonCode.CONDITION_IS_NOT_MATCHED);
}

protected final void run(WorkerExecutor workerExecutor, TriggerTransitionContext triggerContext) {
state.increaseTick();
final TriggerTransitionContext transitionCtx = shouldRun(triggerContext);
if (transitionCtx.isReady()) {
final ExecutionContextInternal<OUT> ctx = new ExecutionContextImpl<>(vertx, state.increaseRound(),
transitionCtx);
log(ctx.triggeredAt(), "Triggering the task execution...");
if (workerExecutor != null) {
workerExecutor.executeBlocking(promise -> executeTask(onExecute(promise, ctx)),
asyncResult -> onResult(transitionCtx, asyncResult));
} else {
vertx.executeBlocking(promise -> executeTask(onExecute(promise, ctx)),
asyncResult -> onResult(transitionCtx, asyncResult));
}
} else {
onMisfire(transitionCtx);
}
/**
* Register a timer id and increase tick time when the system timer fires
*
* @param timerId the system timer id
* @return the current number times of the tick counter
*/
protected final long onFire(long timerId) {
return state.timerId(timerId).increaseTick();
}

protected final void log(@NotNull Instant at, @NotNull String event) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(genMsg(state.tick(), state.round(), at, event));
protected final void onRun(WorkerExecutor workerExecutor, TriggerTransitionContext kickoffContext) {
log(Objects.requireNonNull(kickoffContext.firedAt()), "On fire");
this.<TriggerTransitionContext>executeBlocking(workerExecutor, p -> p.complete(shouldRun(kickoffContext)))
.onSuccess(context -> onTrigger(workerExecutor, context))
.onFailure(t -> onMisfire(TriggerContextFactory.skip(kickoffContext, ReasonCode.UNEXPECTED_ERROR, t)));
}

protected final void onTrigger(WorkerExecutor workerExecutor, TriggerTransitionContext triggerContext) {
if (!triggerContext.isReady()) {
onMisfire(triggerContext);
return;
}
final ExecutionContextInternal<OUT> executionContext = new ExecutionContextImpl<>(vertx, triggerContext,
state.increaseRound());
log(executionContext.triggeredAt(), "On trigger", triggerContext.tick(), executionContext.round());
this.executeBlocking(workerExecutor, p -> executeTask(executionContext.setup(p)))
.onComplete(ar -> onResult(triggerContext, ar));
}

protected final void onSchedule(long timerId) {
final TriggerContext ctx = TriggerContextFactory.scheduled(trigger().type());
final TriggerContext context = TriggerContextFactory.scheduled(trigger().type());
ExecutionResult<OUT> result;
if (state.pending()) {
result = ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setAvailableAt(state.timerId(timerId).markAvailable())
.setTriggerContext(ctx)
.setAvailableAt(state.markAvailable())
.setTriggerContext(context)
.build();
} else {
result = ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setAvailableAt(state.availableAt())
.setTriggerContext(context)
.setRescheduledAt(Instant.now())
.setTriggerContext(ctx)
.setTick(state.timerId(timerId).tick())
.setTick(state.tick())
.setRound(state.round())
.build();
}
Expand All @@ -239,46 +241,49 @@ protected final void onUnableSchedule(Throwable cause) {
final TriggerContext ctx = TriggerContextFactory.error(trigger.type(), ReasonCode.FAILED_TO_SCHEDULE, cause);
monitor.onUnableSchedule(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setTriggerContext(ctx)
.setUnscheduledAt(Instant.now())
.setTick(state.tick())
.setRound(state.round())
.setUnscheduledAt(Instant.now())
.setTriggerContext(ctx)
.build());
}

protected final void onMisfire(@NotNull TriggerTransitionContext triggerContext) {
log(Objects.requireNonNull(triggerContext.firedAt()),
"Skip the execution::" + triggerContext.condition().reasonCode());
final Instant finishedAt = state.markFinished(triggerContext.tick());
log(finishedAt, "On misfire::" + triggerContext.condition().reasonCode(), triggerContext.tick());
monitor.onMisfire(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setTick(state.tick())
.setAvailableAt(state.availableAt())
.setFiredAt(triggerContext.firedAt())
.setTriggerContext(triggerContext)
.setTick(triggerContext.tick())
.setFiredAt(triggerContext.firedAt())
.setRound(state.round())
.setFinishedAt(finishedAt)
.build());
}

@SuppressWarnings("unchecked")
protected final void onResult(@NotNull TriggerTransitionContext triggerContext,
@NotNull AsyncResult<Object> asyncResult) {
final Instant finishedAt = state.markIdle();
final Instant finishedAt = state.markFinished(triggerContext.tick());
TriggerTransitionContext transitionCtx;
if (asyncResult.succeeded()) {
log(finishedAt, "Received the task result");
final ExecutionContextInternal<OUT> executionCtx = (ExecutionContextInternal<OUT>) asyncResult.result();
log(finishedAt, "On result", triggerContext.tick(), executionCtx.round());
monitor.onEach(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setAvailableAt(state.availableAt())
.setTick(state.tick())
.setRound(executionCtx.round())
.setTriggerContext(triggerContext)
.setTick(triggerContext.tick())
.setFiredAt(triggerContext.firedAt())
.setRound(executionCtx.round())
.setTriggeredAt(executionCtx.triggeredAt())
.setExecutedAt(executionCtx.executedAt())
.setFinishedAt(finishedAt)
.setData(state.addData(executionCtx.round(), executionCtx.data()))
.setError(state.addError(executionCtx.round(), executionCtx.error()))
.build());
transitionCtx = shouldStop(triggerContext, executionCtx.isForceStop(), state.round());
transitionCtx = shouldStop(triggerContext, executionCtx.isForceStop(), executionCtx.round());
} else {
LOGGER.warn(genMsg(state.tick(), state.round(), finishedAt, "Programming error"), asyncResult.cause());
transitionCtx = shouldStop(triggerContext, false, state.round());
Expand All @@ -288,9 +293,9 @@ protected final void onResult(@NotNull TriggerTransitionContext triggerContext,
}
}

protected final void onCompleted(TriggerContext context) {
protected final void onComplete(TriggerContext context) {
final Instant completedAt = state.markCompleted();
log(completedAt, "The task execution is completed");
log(completedAt, "On complete");
monitor.onCompleted(ExecutionResultImpl.<OUT>builder()
.setExternalId(jobData.externalId())
.setAvailableAt(state.availableAt())
Expand All @@ -303,13 +308,31 @@ protected final void onCompleted(TriggerContext context) {
.build());
}

private ExecutionContextInternal<OUT> onExecute(@NotNull Promise<Object> promise,
@NotNull ExecutionContextInternal<OUT> executionContext) {
return executionContext.setup(promise, state.markExecuting());
protected final void log(@NotNull Instant at, @NotNull String event) {
log(at, event, state.tick(), state.round());
}

protected final void log(@NotNull Instant at, @NotNull String event, long tick) {
log(at, event, tick, state.round());
}

protected final void log(@NotNull Instant at, @NotNull String event, long tick, long round) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(genMsg(tick, round, at, event));
}
}

private String genMsg(long tick, long round, @NotNull Instant at, @NotNull String event) {
return "Scheduling[" + tick + "][" + round + "][" + at + "]::[" + jobData.externalId() + "] - " + event;
String tId = state.pending() ? "-" : String.valueOf(state.timerId());
String tickAndRound = state.pending() ? "-/-" : (tick + "/" + round);
return "Scheduling[" + tId + "::" + trigger.type() + "::" + jobData.externalId() + "][" + tickAndRound + "]" +
"::[" + at + "] - " + event;
}

private <R> Future<R> executeBlocking(WorkerExecutor workerExecutor, Consumer<Promise<R>> operation) {
return workerExecutor == null
? vertx.executeBlocking(operation::accept, false)
: workerExecutor.executeBlocking(operation::accept, false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,50 +21,47 @@ final class ExecutionContextImpl<OUTPUT> implements ExecutionContextInternal<OUT
private Throwable error;
private boolean forceStop = false;

ExecutionContextImpl(Vertx vertx, long round, TriggerContext triggerContext) {
ExecutionContextImpl(Vertx vertx, TriggerContext triggerContext, long round) {
this.vertx = vertx;
this.round = round;
this.triggerContext = triggerContext;
this.triggeredAt = Instant.now();
}

@Override
public @NotNull ExecutionContextInternal<OUTPUT> setup(@NotNull Promise<Object> promise,
@NotNull Instant executedAt) {
public @NotNull ExecutionContextInternal<OUTPUT> setup(@NotNull Promise<Object> promise) {
if (Objects.nonNull(this.promise)) {
throw new IllegalStateException("TaskExecutionContext is already setup");
}
this.promise = promise;
this.executedAt = executedAt;
this.executedAt = Instant.now();
return this;
}

@Override
public @NotNull Vertx vertx() { return vertx; }
public @NotNull Vertx vertx() { return this.vertx; }

@Override
public @NotNull TriggerContext triggerContext() { return triggerContext; }
public @NotNull TriggerContext triggerContext() { return this.triggerContext; }

@Override
public @NotNull Instant triggeredAt() { return triggeredAt; }
public @NotNull Instant triggeredAt() { return this.triggeredAt; }

@Override
public @NotNull Instant executedAt() { return executedAt; }
public @NotNull Instant executedAt() { return this.executedAt; }

@Override
public long round() { return round; }
public long round() { return this.round; }

@Override
public OUTPUT data() { return data; }
public OUTPUT data() { return this.data; }

@Override
public Throwable error() { return error; }
public Throwable error() { return this.error; }

@Override
public boolean isForceStop() { return forceStop; }
public boolean isForceStop() { return this.forceStop; }

@Override
public void forceStopExecution() { forceStop = true; }
public void forceStopExecution() { this.forceStop = true; }

@Override
public void complete(OUTPUT data) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.github.zero88.schedulerx.impl;

import java.time.Instant;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -11,16 +9,13 @@
interface ExecutionContextInternal<OUTPUT> extends ExecutionContext<OUTPUT> {

/**
* Setup task execution context
* Prepare to execute task
*
* @param promise promise
* @param executedAt execution at time
* @param promise promise
* @return a reference to this for fluent API
* @apiNote It will be invoked by system. In any attempts invoking, {@link IllegalStateException} will be
* thrown
* @see Promise
* @apiNote It will be invoked by system. In any attempts invoking, {@link IllegalStateException} will be thrown
*/
@NotNull ExecutionContextInternal<OUTPUT> setup(@NotNull Promise<Object> promise, @NotNull Instant executedAt);
@NotNull ExecutionContextInternal<OUTPUT> setup(@NotNull Promise<Object> promise);

void internalComplete();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ final class SchedulerStateImpl<OUTPUT> implements SchedulerStateInternal<OUTPUT>
private final AtomicLong tick = new AtomicLong(0);
private final AtomicLong round = new AtomicLong(0);
private final AtomicBoolean completed = new AtomicBoolean(false);
private final AtomicBoolean executing = new AtomicBoolean(false);
private final ConcurrentMap<Long, Boolean> inProgress = new ConcurrentHashMap<>();
private final AtomicBoolean pending = new AtomicBoolean(true);
private final AtomicReference<Entry<Long, OUTPUT>> data = new AtomicReference<>(new SimpleEntry<>(0L, null));
Expand Down Expand Up @@ -53,7 +52,11 @@ final class SchedulerStateImpl<OUTPUT> implements SchedulerStateInternal<OUTPUT>

@Override
public long increaseTick() {
return tick.incrementAndGet();
final long current = this.tick.incrementAndGet();
if (!executing()) {
inProgress.put(current, true);
}
return current;
}

@Override
Expand All @@ -75,14 +78,8 @@ public long increaseTick() {
}

@Override
public @NotNull Instant markExecuting() {
executing.set(true);
return Instant.now();
}

@Override
public @NotNull Instant markIdle() {
executing.set(false);
public @NotNull Instant markFinished(long tick) {
inProgress.remove(tick);
return Instant.now();
}

Expand Down
Loading

0 comments on commit 368b572

Please sign in to comment.