Skip to content

Commit

Permalink
Merge pull request #80 from zero88/feature/evaluate-trigger-in-blocki…
Browse files Browse the repository at this point in the history
…ng-thread

Feature/evaluate trigger in blocking thread
  • Loading branch information
zero88 authored Oct 28, 2023
2 parents 041b845 + a51e550 commit 62a1afd
Show file tree
Hide file tree
Showing 32 changed files with 676 additions and 424 deletions.
1 change: 1 addition & 0 deletions core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies {
codeGenerator(VertxLibs.rx3)
codeGenerator(MutinyLibs.core)

testImplementation(LogLibs.logback)
testImplementation(TestLibs.junit5Params)
testImplementation(JacksonLibs.databind)
testImplementation(JacksonLibs.jsr310)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.github.zero88.schedulerx;

import java.time.Instant;
import java.util.Objects;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -45,9 +44,7 @@ public interface ExecutionContext<OUT> {
*
* @return triggeredAt
*/
default @NotNull Instant triggeredAt() {
return Objects.requireNonNull(triggerContext().triggerAt());
}
@NotNull Instant triggeredAt();

/**
* Executed at time
Expand All @@ -64,20 +61,20 @@ public interface ExecutionContext<OUT> {
boolean isForceStop();

/**
* Mark a flag stop/cancel to cancel executor
* Mark a force stop flag to unregister the trigger out of the system timer
*/
void forceStopExecution();

/**
* Completed execution with result data per each round
* Notify finish an execution per each round with its result data
*
* @param data object data
* @apiNote if task is {@code async} then it should be invoked in handling async result stage
*/
void complete(@Nullable OUT data);

/**
* Failed execution with error per each round
* Notify finish an execution per each round with its error data
*
* @param throwable execution error
* @apiNote if task is {@code async} then it should be invoked in handling async result stage
Expand Down
83 changes: 52 additions & 31 deletions core/src/main/java/io/github/zero88/schedulerx/ExecutionResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import io.github.zero88.schedulerx.trigger.TriggerContext;

/**
* Represents for task result will be pass on each event of {@link SchedulingMonitor}
* Represents for an execution result will be pass on each event of {@link SchedulingMonitor}
*
* @param <OUTPUT> Type of task result data
* @param <OUTPUT> Type of execution result
* @apiNote This interface is renamed from {@code TaskResult} since {@code 2.0.0}
* @see SchedulingMonitor
* @since 2.0.0
Expand All @@ -27,93 +27,112 @@ public interface ExecutionResult<OUTPUT> {
*/
@Nullable <T> T externalId();

/**
* Identify the current trigger information in the execution runtime.
*
* @return the trigger context
*/
TriggerContext triggerContext();

/**
* Only {@code not null} in {@link SchedulingMonitor#onUnableSchedule(ExecutionResult)}
* Identify the trigger that cannot be scheduled on the system timer at a clock time.
*
* @return unschedule at time
* @see SchedulingMonitor#onUnableSchedule(ExecutionResult)
*/
Instant unscheduledAt();

/**
* Only {@code not null} if reschedule {@link SchedulingMonitor#onUnableSchedule(ExecutionResult)}
* Identify the trigger is registered on the system timer at a clock time.
*
* @return reschedule at time
* @see #isReschedule()
* @return available at time
* @see SchedulingMonitor#onSchedule(ExecutionResult)
*/
Instant availableAt();

/**
* Identify the trigger is re-scheduled on the system timer at a clock time, only in case of trigger type is
* {@code cron}.
*
* @return rescheduled at time
* @see SchedulingMonitor#onSchedule(ExecutionResult)
*/
Instant rescheduledAt();

/**
* Task executor is available to run, in other words "task is scheduled at a time"
* Identify the system timer fires the trigger at a clock time.
*
* @return available at time
* @return fired at time
* @see SchedulingMonitor#onEach(ExecutionResult)
* @see SchedulingMonitor#onMisfire(ExecutionResult)
*/
Instant availableAt();
Instant firedAt();

/**
* Task executor triggers task round at time
* Identify the trigger is ready to run new execution round at a clock time.
*
* @return task round triggered at time
* @return triggered at time
* @see SchedulingMonitor#onEach(ExecutionResult)
*/
Instant triggeredAt();

/**
* Task executor executes task round at time
* Identify the trigger task that is started to execute at a clock time.
*
* @return task round executed at time
* @return executed at time
* @see SchedulingMonitor#onEach(ExecutionResult)
*/
Instant executedAt();

/**
* Task executor finishes task round at time
* Identify the trigger tick time is already processed at a clock time, regardless its status is misfire or its task
* is executed.
*
* @return task round finished at time
* @return finished at time
* @see SchedulingMonitor#onEach(ExecutionResult)
* @see SchedulingMonitor#onMisfire(ExecutionResult)
*/
Instant finishedAt();

/**
* Task executor completes at time
* Identify the trigger is completed at a clock time, that means no more is fired by the system timer.
*
* @return completed at time
* @see SchedulingMonitor#onCompleted(ExecutionResult)
*/
Instant completedAt();

/**
* The execution tick
* The current number of times that the system timer fires the trigger. This value can be greater than
* {@link #round()} due to misfire.
*
* @return the execution tick. It can be greater than {@code round} due to misfire
* @return the tick
* @apiNote The time at which the number of rounds is changed is {@link #firedAt()}
*/
long tick();

/**
* The execution round
* The current number of times that the trigger's task is executed.
*
* @return the execution round
* @return the round
* @apiNote The time at which the number of rounds is changed is {@link #triggeredAt()}
*/
long round();

/**
* Task result data per each round or latest result data when {@code isCompleted = true}
* Task result data per each round or latest result data if trigger is completed.
*
* @return task result data, might be null
*/
@Nullable OUTPUT data();

/**
* Task result error per each round or latest result error when {@code isCompleted = true}
* Task result error per each round or latest result error if trigger is completed.
*
* @return task result error, might be null
*/
@Nullable Throwable error();

/**
* Identify task executor is completed by cancel event or reach to limit round
*
* @return {@code true} if completed
*/
boolean isCompleted();

/**
* Identify task execution is error or not
*
Expand All @@ -122,10 +141,12 @@ public interface ExecutionResult<OUTPUT> {
default boolean isError() { return Objects.nonNull(error()); }

/**
* Identify task is reschedule or not
* Check whether the trigger is re-registered in the system timer or not after the trigger is available, only in
* case of trigger type is {@code cron}.
*
* @return {@code true} if reschedule
* @see #rescheduledAt()
*/
default boolean isReschedule() { return Objects.nonNull(rescheduledAt()) && round() > 0; }
default boolean isReschedule() { return Objects.nonNull(rescheduledAt()) && tick() > 0; }

}
36 changes: 10 additions & 26 deletions core/src/main/java/io/github/zero88/schedulerx/SchedulerState.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,73 +16,57 @@
public interface SchedulerState<OUT> {

/**
* Timer id
*
* @return timer id
* @return The system timer id
*/
long timerId();

/**
* Identifies an executor is on scheduler at time
* Identify the trigger is registered on the system timer at a clock time.
*
* @return available at time
*/
Instant availableAt();

/**
* Current number of times that trigger is fired
*
* @return tick
* @return The current number of times that the system timer fires the trigger.
*/
long tick();

/**
* Current number of times that trigger is executed
*
* @return round
* @return The current number of times that the trigger is executed.
*/
long round();

/**
* Check whether executor is in {@code pending} state that means is not in a {@code scheduler}
* Check whether the trigger is in {@code pending} state that means is not yet registered in a {@code scheduler}.
*
* @return {@code true} if pending
*/
boolean pending();

/**
* Check whether {@code executor} is in executing state
* Check whether the trigger is in processing: validation phase or execution phase.
*
* @return {@code true} if in executing
* @return {@code true} if in progress
*/
boolean executing();

/**
* Check whether {@code executor} is {@code idle} state that means is in {@code scheduler} but in {@code break-time}
* between 2 executions
*
* @return {@code true} if idle
*/
default boolean idle() {
return !executing() && !completed() && !pending();
}

/**
* Check whether {@code executor} is {@code completed} state that means safe to remove out of a {@code scheduler}
* Check whether the trigger is {@code completed} state that means safe to remove out of a {@code scheduler}.
*
* @return {@code true} if completed
*/
boolean completed();

/**
* Latest data of previous round
* Get the data of latest round.
*
* @return latest data
*/
@Nullable OUT lastData();

/**
* Latest error of previous round
* Get the error of latest round.
*
* @return latest error
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package io.github.zero88.schedulerx;

import static io.github.zero88.schedulerx.impl.Utils.brackets;

import java.text.MessageFormat;

import org.jetbrains.annotations.NotNull;

import io.vertx.core.impl.logging.Logger;
Expand All @@ -16,46 +20,59 @@ public interface SchedulingLogMonitor<OUT> extends SchedulingMonitor<OUT> {

Logger LOGGER = LoggerFactory.getLogger(SchedulingLogMonitor.class);

static <OUT> SchedulingLogMonitor<OUT> create() {
return new SchedulingLogMonitor<OUT>() { };
}
static <OUT> SchedulingLogMonitor<OUT> create() { return new SchedulingLogMonitor<>() { }; }

@Override
default void onUnableSchedule(@NotNull ExecutionResult<OUT> result) {
LOGGER.error(
"Task[" + result.externalId() + "] is unable to schedule at[" + result.unscheduledAt() + "] due to error",
result.error());
LOGGER.error(MessageFormat.format("Trigger{0} is unable to schedule at {1}{2}{3}",
brackets(result.triggerContext().type() + "::" + result.externalId()),
brackets(result.tick() + "/" + result.round()),
brackets("unscheduledAt|" + result.unscheduledAt()),
brackets("cause|" + result.triggerContext().condition().cause())));
}

@Override
default void onSchedule(@NotNull ExecutionResult<OUT> result) {
if (result.isReschedule()) {
LOGGER.debug(
"Task[" + result.externalId() + "] is rescheduled at[" + result.rescheduledAt() + "] after round[" +
result.round() + "]");
LOGGER.debug(MessageFormat.format("Trigger{0} has been rescheduled at {1}{2}",
brackets(result.triggerContext().type() + "::" + result.externalId()),
brackets(result.tick() + "/" + result.round()),
brackets("rescheduledAt|" + result.rescheduledAt())));
} else {
LOGGER.debug("Task[" + result.externalId() + "] is scheduled at[" + result.availableAt() + "]");
LOGGER.debug(MessageFormat.format("Trigger{0} has been registered at {1}{2}",
brackets(result.triggerContext().type() + "::" + result.externalId()),
brackets("-/-"), brackets("availableAt|" + result.availableAt())));
}
}

@Override
default void onMisfire(@NotNull ExecutionResult<OUT> result) {
LOGGER.debug(
"Task[" + result.externalId() + "] is misfire at tick[" + result.tick() + "] in round[" + result.round() +
"] at[" + result.triggeredAt() + "]");
LOGGER.debug(MessageFormat.format("Trigger{0} has been misfire at {1}{2}{3}{4}",
brackets(result.triggerContext().type() + "::" + result.externalId()),
brackets(result.tick() + "/" + result.round()),
brackets("firedAt|" + result.firedAt()),
brackets("finishedAt|" + result.finishedAt()),
brackets("reason|" + result.triggerContext().condition().reasonCode())));
}

@Override
default void onEach(@NotNull ExecutionResult<OUT> result) {
LOGGER.debug("Task[" + result.externalId() + "] has been executed in round[" + result.round() + "] startedAt[" +
result.executedAt() + "] - endedAt[" + result.finishedAt() + "] - Error[" + result.isError() +
"]");
LOGGER.debug(MessageFormat.format("Trigger{0} has been executed at {1}{2}{3}{4}{5}",
brackets(result.triggerContext().type() + "::" + result.externalId()),
brackets(result.tick() + "/" + result.round()),
brackets("firedAt|" + result.firedAt()),
brackets("triggerAt|" + result.triggeredAt()),
brackets("startedAt|" + result.executedAt()),
brackets("endedAt|" + result.finishedAt())));
}

@Override
default void onCompleted(@NotNull ExecutionResult<OUT> result) {
LOGGER.debug("Task[" + result.externalId() + "] is completed in round[" + result.round() + "] at[" +
result.completedAt() + "]");
LOGGER.debug(MessageFormat.format("Trigger{0} has been completed at {1}{2}{3}",
brackets(result.triggerContext().type() + "::" + result.externalId()),
brackets(result.tick() + "/" + result.round()),
brackets("completedAt|" + result.completedAt()),
brackets("reason|" + result.triggerContext().condition().reasonCode())));
}

}
Loading

0 comments on commit 62a1afd

Please sign in to comment.