Skip to content

Commit

Permalink
Adding task cancellation time in task API
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
  • Loading branch information
sgup432 committed May 5, 2023
1 parent 9bf99b4 commit 706fbb8
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,14 @@ public void testTasksCancellation() throws Exception {
.get();
assertEquals(1, cancelTasksResponse.getTasks().size());

// Tasks are marked as cancelled at this point but not yet completed.
List<TaskInfo> taskInfoList = client().admin().cluster().prepareListTasks()
.setActions(TestTaskPlugin.TestTaskAction.NAME + "*").get().getTasks();
for (TaskInfo taskInfo: taskInfoList) {
assertTrue(taskInfo.isCancelled());
assertNotEquals(-1, taskInfo.getCancellationStartTime());
assertNotEquals(-1, taskInfo.getRunningTimeSinceCancellationNanos());
}
future.get();

logger.info("--> checking that test tasks are not running");
Expand Down
18 changes: 18 additions & 0 deletions server/src/main/java/org/opensearch/tasks/CancellableTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ public abstract class CancellableTask extends Task {
private volatile String reason;
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final TimeValue cancelAfterTimeInterval;
/**
* The task's cancellation start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style).
*/
private long cancellationStartTime = -1;
/**
* The task's cancellation start time as a relative time ({@link System#nanoTime()} style).
*/
private long cancellationStartTimeNanos = -1;

public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map<String, String> headers) {
this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT);
Expand All @@ -74,6 +82,8 @@ public CancellableTask(
public void cancel(String reason) {
assert reason != null;
if (cancelled.compareAndSet(false, true)) {
this.cancellationStartTime = System.currentTimeMillis();
this.cancellationStartTimeNanos = System.nanoTime();
this.reason = reason;
onCancelled();
}
Expand All @@ -87,6 +97,14 @@ public boolean cancelOnParentLeaving() {
return true;
}

public long getCancellationStartTime() {
return cancellationStartTime;
}

public long getCancellationStartTimeNanos() {
return cancellationStartTimeNanos;
}

/**
* Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled.
*/
Expand Down
14 changes: 12 additions & 2 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,14 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status
* Build a proper {@link TaskInfo} for this task.
*/
protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) {
boolean cancelled = this instanceof CancellableTask && ((CancellableTask) this).isCancelled();
long cancellationStartTime = -1;
long runningTimeSinceCancellationNanos = -1;
if (cancelled) {
cancellationStartTime = ((CancellableTask) this).getCancellationStartTime();
runningTimeSinceCancellationNanos =
System.nanoTime() - ((CancellableTask) this).getCancellationStartTimeNanos();
}
return new TaskInfo(
new TaskId(localNodeId, getId()),
getType(),
Expand All @@ -201,10 +209,12 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status
startTime,
System.nanoTime() - startTimeNanos,
this instanceof CancellableTask,
this instanceof CancellableTask && ((CancellableTask) this).isCancelled(),
cancelled,
parentTask,
headers,
resourceStats
resourceStats,
cancellationStartTime,
runningTimeSinceCancellationNanos
);
}

Expand Down
72 changes: 69 additions & 3 deletions server/src/main/java/org/opensearch/tasks/TaskInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ public final class TaskInfo implements Writeable, ToXContentFragment {

private final boolean cancelled;

private final long cancellationStartTime;

private final long runningTimeSinceCancellationNanos;

private final TaskId parentTaskId;

private final Map<String, String> headers;
Expand All @@ -104,6 +108,26 @@ public TaskInfo(
TaskId parentTaskId,
Map<String, String> headers,
TaskResourceStats resourceStats
) {
this(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, cancelled,
parentTaskId, headers, resourceStats, -1, -1);
}

public TaskInfo(
TaskId taskId,
String type,
String action,
String description,
Task.Status status,
long startTime,
long runningTimeNanos,
boolean cancellable,
boolean cancelled,
TaskId parentTaskId,
Map<String, String> headers,
TaskResourceStats resourceStats,
long cancelledStartTime,
long runningTimeSinceCancellationNanos
) {
if (cancellable == false && cancelled == true) {
throw new IllegalArgumentException("task cannot be cancelled");
Expand All @@ -120,6 +144,8 @@ public TaskInfo(
this.parentTaskId = parentTaskId;
this.headers = headers;
this.resourceStats = resourceStats;
this.cancellationStartTime = cancelledStartTime;
this.runningTimeSinceCancellationNanos = runningTimeSinceCancellationNanos;
}

/**
Expand Down Expand Up @@ -150,6 +176,13 @@ public TaskInfo(StreamInput in) throws IOException {
} else {
resourceStats = null;
}
if (in.getVersion().onOrAfter(Version.V_2_8_0)) {
cancellationStartTime = in.readLong();
runningTimeSinceCancellationNanos = in.readLong();
} else {
cancellationStartTime = -1;
runningTimeSinceCancellationNanos = -1;
}
}

@Override
Expand All @@ -170,6 +203,13 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_1_0)) {
out.writeOptionalWriteable(resourceStats);
}
if (out.getVersion().onOrAfter(Version.V_2_6_0)) {
out.writeLong(cancellationStartTime);
out.writeLong(runningTimeSinceCancellationNanos);
} else {
out.writeLong(-1);
out.writeLong(-1);
}
}

public TaskId getTaskId() {
Expand Down Expand Up @@ -228,6 +268,14 @@ public boolean isCancelled() {
return cancelled;
}

public long getCancellationStartTime() {
return cancellationStartTime;
}

public long getRunningTimeSinceCancellationNanos() {
return runningTimeSinceCancellationNanos;
}

/**
* Returns the parent task id
*/
Expand Down Expand Up @@ -281,6 +329,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
resourceStats.toXContent(builder, params);
builder.endObject();
}
if (cancellationStartTime != -1) {
builder.field("cancelled_at_millis", cancellationStartTime);
builder.field("running_time_since_cancellation_nanos", runningTimeSinceCancellationNanos);
}
return builder;
}

Expand Down Expand Up @@ -308,6 +360,12 @@ public static TaskInfo fromXContent(XContentParser parser) {
}
@SuppressWarnings("unchecked")
TaskResourceStats resourceStats = (TaskResourceStats) a[i++];
long cancellationStartTime = -1;
long runningTimeSinceCancellationNanos = -1;
if (cancelled) {
cancellationStartTime = (Long) a[i++];
runningTimeSinceCancellationNanos = (Long) a[i++];
}
RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes);
TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString);
return new TaskInfo(
Expand All @@ -322,7 +380,9 @@ public static TaskInfo fromXContent(XContentParser parser) {
cancelled,
parentTaskId,
headers,
resourceStats
resourceStats,
cancellationStartTime,
runningTimeSinceCancellationNanos
);
});
static {
Expand All @@ -341,6 +401,8 @@ public static TaskInfo fromXContent(XContentParser parser) {
PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers"));
PARSER.declareObject(optionalConstructorArg(), (p, c) -> TaskResourceStats.fromXContent(p), new ParseField("resource_stats"));
PARSER.declareLong(optionalConstructorArg(), new ParseField("cancelled_at_millis"));
PARSER.declareLong(optionalConstructorArg(), new ParseField("running_time_since_cancellation_nanos"));
}

@Override
Expand All @@ -366,7 +428,9 @@ public boolean equals(Object obj) {
&& Objects.equals(cancelled, other.cancelled)
&& Objects.equals(status, other.status)
&& Objects.equals(headers, other.headers)
&& Objects.equals(resourceStats, other.resourceStats);
&& Objects.equals(resourceStats, other.resourceStats)
&& Objects.equals(cancellationStartTime, other.cancellationStartTime)
&& Objects.equals(runningTimeSinceCancellationNanos, other.runningTimeSinceCancellationNanos);
}

@Override
Expand All @@ -383,7 +447,9 @@ public int hashCode() {
cancelled,
status,
headers,
resourceStats
resourceStats,
cancellationStartTime,
runningTimeSinceCancellationNanos
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public void testCancellableOptionWhenCancelledTrue() {
long taskId = randomIntBetween(0, 100000);
long startTime = randomNonNegativeLong();
long runningTime = randomNonNegativeLong();
long cancellationStartTime = randomNonNegativeLong();
long runningTimeSinceCancellationNanos = randomNonNegativeLong();
boolean cancellable = true;
boolean cancelled = true;
TaskInfo taskInfo = new TaskInfo(
Expand All @@ -103,12 +105,16 @@ public void testCancellableOptionWhenCancelledTrue() {
cancelled,
TaskId.EMPTY_TASK_ID,
Collections.singletonMap("foo", "bar"),
randomResourceStats(randomBoolean())
randomResourceStats(randomBoolean()),
cancellationStartTime,
runningTimeSinceCancellationNanos
);
String taskInfoString = taskInfo.toString();
Map<String, Object> map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2();
assertEquals(map.get("cancellable"), cancellable);
assertEquals(map.get("cancelled"), cancelled);
assertEquals(map.get("cancelled_at_millis"), cancellationStartTime);
assertEquals(map.get("running_time_since_cancellation_nanos"), runningTimeSinceCancellationNanos);
}

public void testCancellableOptionWhenCancelledFalse() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void testEmptyToString() {
assertEquals("{\n" + " \"tasks\" : [ ]\n" + "}", new ListTasksResponse(null, null, null).toString());
}

// Here
public void testNonEmptyToString() {
TaskInfo info = new TaskInfo(
new TaskId("node1", 1),
Expand All @@ -78,7 +79,10 @@ public void testNonEmptyToString() {
{
put("dummy-type1", new TaskResourceUsage(100, 100));
}
})
}),
0,
1

);
ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList());
assertEquals(
Expand All @@ -105,7 +109,9 @@ public void testNonEmptyToString() {
+ " \"cpu_time_in_nanos\" : 100,\n"
+ " \"memory_in_bytes\" : 100\n"
+ " }\n"
+ " }\n"
+ " },\n"
+ " \"cancelled_at_millis\" : 0,\n"
+ " \"running_time_since_cancellation_nanos\" : 1\n"
+ " }\n"
+ " ]\n"
+ "}",
Expand Down
48 changes: 45 additions & 3 deletions server/src/test/java/org/opensearch/tasks/TaskInfoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ protected Predicate<String> getRandomFieldsExcludeFilter() {

@Override
protected TaskInfo mutateInstance(TaskInfo info) {
switch (between(0, 10)) {
switch (between(0, 12)) {
case 0:
TaskId taskId = new TaskId(info.getTaskId().getNodeId() + randomAlphaOfLength(5), info.getTaskId().getId());
return new TaskInfo(
Expand Down Expand Up @@ -266,6 +266,40 @@ protected TaskInfo mutateInstance(TaskInfo info) {
info.getHeaders(),
new TaskResourceStats(resourceUsageMap)
);
case 11:
return new TaskInfo(
info.getTaskId(),
info.getType(),
info.getAction(),
info.getDescription(),
info.getStatus(),
info.getStartTime(),
info.getRunningTimeNanos(),
info.isCancellable(),
info.isCancelled(),
info.getParentTaskId(),
info.getHeaders(),
info.getResourceStats(),
info.getCancellationStartTime() + between(1, 100),
info.getRunningTimeSinceCancellationNanos()
);
case 12:
return new TaskInfo(
info.getTaskId(),
info.getType(),
info.getAction(),
info.getDescription(),
info.getStatus(),
info.getStartTime(),
info.getRunningTimeNanos(),
true,
true,
info.getParentTaskId(),
info.getHeaders(),
info.getResourceStats(),
info.getCancellationStartTime(),
info.getRunningTimeSinceCancellationNanos() + between(1, 100)
);
default:
throw new IllegalStateException();
}
Expand All @@ -285,6 +319,12 @@ static TaskInfo randomTaskInfo(boolean detailed) {
long runningTimeNanos = randomLong();
boolean cancellable = randomBoolean();
boolean cancelled = cancellable == true ? randomBoolean() : false;
long cancellationStartTime = -1;
long runningTimeSinceCancellationNanos = -1;
if (cancelled) {
cancellationStartTime = randomLong();
runningTimeSinceCancellationNanos = randomLong();
}
TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId();
Map<String, String> headers = randomBoolean()
? Collections.emptyMap()
Expand All @@ -301,7 +341,9 @@ static TaskInfo randomTaskInfo(boolean detailed) {
cancelled,
parentTaskId,
headers,
randomResourceStats(detailed)
randomResourceStats(detailed),
cancellationStartTime,
runningTimeSinceCancellationNanos
);
}

Expand All @@ -312,7 +354,7 @@ private static TaskId randomTaskId() {
private static RawTaskStatus randomRawTaskStatus() {
try (XContentBuilder builder = XContentBuilder.builder(Requests.INDEX_CONTENT_TYPE.xContent())) {
builder.startObject();
int fields = between(0, 10);
int fields = between(0, 12);
for (int f = 0; f < fields; f++) {
builder.field(randomAlphaOfLength(5), randomAlphaOfLength(5));
}
Expand Down

0 comments on commit 706fbb8

Please sign in to comment.