diff --git a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java index 62180412dbf98..aef0d0ccbc2ed 100644 --- a/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/action/admin/cluster/node/tasks/TasksIT.java @@ -543,6 +543,14 @@ public void testTasksCancellation() throws Exception { .get(); assertEquals(1, cancelTasksResponse.getTasks().size()); + // Tasks are marked as cancelled at this point but not yet completed. + List taskInfoList = client().admin().cluster().prepareListTasks() + .setActions(TestTaskPlugin.TestTaskAction.NAME + "*").get().getTasks(); + for (TaskInfo taskInfo: taskInfoList) { + assertTrue(taskInfo.isCancelled()); + assertNotEquals(-1, taskInfo.getCancellationStartTime()); + assertNotEquals(-1, taskInfo.getRunningTimeSinceCancellationNanos()); + } future.get(); logger.info("--> checking that test tasks are not running"); diff --git a/server/src/main/java/org/opensearch/tasks/CancellableTask.java b/server/src/main/java/org/opensearch/tasks/CancellableTask.java index 336f5b1f4c244..01276471b7595 100644 --- a/server/src/main/java/org/opensearch/tasks/CancellableTask.java +++ b/server/src/main/java/org/opensearch/tasks/CancellableTask.java @@ -50,6 +50,14 @@ public abstract class CancellableTask extends Task { private volatile String reason; private final AtomicBoolean cancelled = new AtomicBoolean(false); private final TimeValue cancelAfterTimeInterval; + /** + * The task's cancellation start time as a wall clock time since epoch ({@link System#currentTimeMillis()} style). + */ + private long cancellationStartTime = -1; + /** + * The task's cancellation start time as a relative time ({@link System#nanoTime()} style). + */ + private long cancellationStartTimeNanos = -1; public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT); @@ -74,6 +82,8 @@ public CancellableTask( public void cancel(String reason) { assert reason != null; if (cancelled.compareAndSet(false, true)) { + this.cancellationStartTime = System.currentTimeMillis(); + this.cancellationStartTimeNanos = System.nanoTime(); this.reason = reason; onCancelled(); } @@ -87,6 +97,14 @@ public boolean cancelOnParentLeaving() { return true; } + public long getCancellationStartTime() { + return cancellationStartTime; + } + + public long getCancellationStartTimeNanos() { + return cancellationStartTimeNanos; + } + /** * Returns true if this task can potentially have children that need to be cancelled when it parent is cancelled. */ diff --git a/server/src/main/java/org/opensearch/tasks/Task.java b/server/src/main/java/org/opensearch/tasks/Task.java index 638c9c7ab41bd..8f1bb831849bf 100644 --- a/server/src/main/java/org/opensearch/tasks/Task.java +++ b/server/src/main/java/org/opensearch/tasks/Task.java @@ -192,6 +192,14 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status * Build a proper {@link TaskInfo} for this task. */ protected final TaskInfo taskInfo(String localNodeId, String description, Status status, TaskResourceStats resourceStats) { + boolean cancelled = this instanceof CancellableTask && ((CancellableTask) this).isCancelled(); + long cancellationStartTime = -1; + long runningTimeSinceCancellationNanos = -1; + if (cancelled) { + cancellationStartTime = ((CancellableTask) this).getCancellationStartTime(); + runningTimeSinceCancellationNanos = + System.nanoTime() - ((CancellableTask) this).getCancellationStartTimeNanos(); + } return new TaskInfo( new TaskId(localNodeId, getId()), getType(), @@ -201,10 +209,12 @@ protected final TaskInfo taskInfo(String localNodeId, String description, Status startTime, System.nanoTime() - startTimeNanos, this instanceof CancellableTask, - this instanceof CancellableTask && ((CancellableTask) this).isCancelled(), + cancelled, parentTask, headers, - resourceStats + resourceStats, + cancellationStartTime, + runningTimeSinceCancellationNanos ); } diff --git a/server/src/main/java/org/opensearch/tasks/TaskInfo.java b/server/src/main/java/org/opensearch/tasks/TaskInfo.java index 63bc46f8cfca6..172c8f4e57a3f 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskInfo.java +++ b/server/src/main/java/org/opensearch/tasks/TaskInfo.java @@ -85,6 +85,10 @@ public final class TaskInfo implements Writeable, ToXContentFragment { private final boolean cancelled; + private final long cancellationStartTime; + + private final long runningTimeSinceCancellationNanos; + private final TaskId parentTaskId; private final Map headers; @@ -104,6 +108,26 @@ public TaskInfo( TaskId parentTaskId, Map headers, TaskResourceStats resourceStats + ) { + this(taskId, type, action, description, status, startTime, runningTimeNanos, cancellable, cancelled, + parentTaskId, headers, resourceStats, -1, -1); + } + + public TaskInfo( + TaskId taskId, + String type, + String action, + String description, + Task.Status status, + long startTime, + long runningTimeNanos, + boolean cancellable, + boolean cancelled, + TaskId parentTaskId, + Map headers, + TaskResourceStats resourceStats, + long cancelledStartTime, + long runningTimeSinceCancellationNanos ) { if (cancellable == false && cancelled == true) { throw new IllegalArgumentException("task cannot be cancelled"); @@ -120,6 +144,8 @@ public TaskInfo( this.parentTaskId = parentTaskId; this.headers = headers; this.resourceStats = resourceStats; + this.cancellationStartTime = cancelledStartTime; + this.runningTimeSinceCancellationNanos = runningTimeSinceCancellationNanos; } /** @@ -150,6 +176,13 @@ public TaskInfo(StreamInput in) throws IOException { } else { resourceStats = null; } + if (in.getVersion().onOrAfter(Version.V_2_8_0)) { + cancellationStartTime = in.readLong(); + runningTimeSinceCancellationNanos = in.readLong(); + } else { + cancellationStartTime = -1; + runningTimeSinceCancellationNanos = -1; + } } @Override @@ -170,6 +203,13 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_1_0)) { out.writeOptionalWriteable(resourceStats); } + if (out.getVersion().onOrAfter(Version.V_2_6_0)) { + out.writeLong(cancellationStartTime); + out.writeLong(runningTimeSinceCancellationNanos); + } else { + out.writeLong(-1); + out.writeLong(-1); + } } public TaskId getTaskId() { @@ -228,6 +268,14 @@ public boolean isCancelled() { return cancelled; } + public long getCancellationStartTime() { + return cancellationStartTime; + } + + public long getRunningTimeSinceCancellationNanos() { + return runningTimeSinceCancellationNanos; + } + /** * Returns the parent task id */ @@ -281,6 +329,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws resourceStats.toXContent(builder, params); builder.endObject(); } + if (cancellationStartTime != -1) { + builder.field("cancelled_at_millis", cancellationStartTime); + builder.field("running_time_since_cancellation_nanos", runningTimeSinceCancellationNanos); + } return builder; } @@ -308,6 +360,12 @@ public static TaskInfo fromXContent(XContentParser parser) { } @SuppressWarnings("unchecked") TaskResourceStats resourceStats = (TaskResourceStats) a[i++]; + long cancellationStartTime = -1; + long runningTimeSinceCancellationNanos = -1; + if (cancelled) { + cancellationStartTime = (Long) a[i++]; + runningTimeSinceCancellationNanos = (Long) a[i++]; + } RawTaskStatus status = statusBytes == null ? null : new RawTaskStatus(statusBytes); TaskId parentTaskId = parentTaskIdString == null ? TaskId.EMPTY_TASK_ID : new TaskId(parentTaskIdString); return new TaskInfo( @@ -322,7 +380,9 @@ public static TaskInfo fromXContent(XContentParser parser) { cancelled, parentTaskId, headers, - resourceStats + resourceStats, + cancellationStartTime, + runningTimeSinceCancellationNanos ); }); static { @@ -341,6 +401,8 @@ public static TaskInfo fromXContent(XContentParser parser) { PARSER.declareString(optionalConstructorArg(), new ParseField("parent_task_id")); PARSER.declareObject(optionalConstructorArg(), (p, c) -> p.mapStrings(), new ParseField("headers")); PARSER.declareObject(optionalConstructorArg(), (p, c) -> TaskResourceStats.fromXContent(p), new ParseField("resource_stats")); + PARSER.declareLong(optionalConstructorArg(), new ParseField("cancelled_at_millis")); + PARSER.declareLong(optionalConstructorArg(), new ParseField("running_time_since_cancellation_nanos")); } @Override @@ -366,7 +428,9 @@ public boolean equals(Object obj) { && Objects.equals(cancelled, other.cancelled) && Objects.equals(status, other.status) && Objects.equals(headers, other.headers) - && Objects.equals(resourceStats, other.resourceStats); + && Objects.equals(resourceStats, other.resourceStats) + && Objects.equals(cancellationStartTime, other.cancellationStartTime) + && Objects.equals(runningTimeSinceCancellationNanos, other.runningTimeSinceCancellationNanos); } @Override @@ -383,7 +447,9 @@ public int hashCode() { cancelled, status, headers, - resourceStats + resourceStats, + cancellationStartTime, + runningTimeSinceCancellationNanos ); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java index 45db94577f15f..73712fb520e7d 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskTests.java @@ -89,6 +89,8 @@ public void testCancellableOptionWhenCancelledTrue() { long taskId = randomIntBetween(0, 100000); long startTime = randomNonNegativeLong(); long runningTime = randomNonNegativeLong(); + long cancellationStartTime = randomNonNegativeLong(); + long runningTimeSinceCancellationNanos = randomNonNegativeLong(); boolean cancellable = true; boolean cancelled = true; TaskInfo taskInfo = new TaskInfo( @@ -103,12 +105,16 @@ public void testCancellableOptionWhenCancelledTrue() { cancelled, TaskId.EMPTY_TASK_ID, Collections.singletonMap("foo", "bar"), - randomResourceStats(randomBoolean()) + randomResourceStats(randomBoolean()), + cancellationStartTime, + runningTimeSinceCancellationNanos ); String taskInfoString = taskInfo.toString(); Map map = XContentHelper.convertToMap(new BytesArray(taskInfoString.getBytes(StandardCharsets.UTF_8)), true).v2(); assertEquals(map.get("cancellable"), cancellable); assertEquals(map.get("cancelled"), cancelled); + assertEquals(map.get("cancelled_at_millis"), cancellationStartTime); + assertEquals(map.get("running_time_since_cancellation_nanos"), runningTimeSinceCancellationNanos); } public void testCancellableOptionWhenCancelledFalse() { diff --git a/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java b/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java index 3890c555df2f9..e0e8df7f0349e 100644 --- a/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java +++ b/server/src/test/java/org/opensearch/tasks/ListTasksResponseTests.java @@ -61,6 +61,7 @@ public void testEmptyToString() { assertEquals("{\n" + " \"tasks\" : [ ]\n" + "}", new ListTasksResponse(null, null, null).toString()); } + // Here public void testNonEmptyToString() { TaskInfo info = new TaskInfo( new TaskId("node1", 1), @@ -78,7 +79,10 @@ public void testNonEmptyToString() { { put("dummy-type1", new TaskResourceUsage(100, 100)); } - }) + }), + 0, + 1 + ); ListTasksResponse tasksResponse = new ListTasksResponse(singletonList(info), emptyList(), emptyList()); assertEquals( @@ -105,7 +109,9 @@ public void testNonEmptyToString() { + " \"cpu_time_in_nanos\" : 100,\n" + " \"memory_in_bytes\" : 100\n" + " }\n" - + " }\n" + + " },\n" + + " \"cancelled_at_millis\" : 0,\n" + + " \"running_time_since_cancellation_nanos\" : 1\n" + " }\n" + " ]\n" + "}", diff --git a/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java b/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java index c263972c9cb97..6832bdeedcade 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskInfoTests.java @@ -83,7 +83,7 @@ protected Predicate getRandomFieldsExcludeFilter() { @Override protected TaskInfo mutateInstance(TaskInfo info) { - switch (between(0, 10)) { + switch (between(0, 12)) { case 0: TaskId taskId = new TaskId(info.getTaskId().getNodeId() + randomAlphaOfLength(5), info.getTaskId().getId()); return new TaskInfo( @@ -266,6 +266,40 @@ protected TaskInfo mutateInstance(TaskInfo info) { info.getHeaders(), new TaskResourceStats(resourceUsageMap) ); + case 11: + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + info.isCancellable(), + info.isCancelled(), + info.getParentTaskId(), + info.getHeaders(), + info.getResourceStats(), + info.getCancellationStartTime() + between(1, 100), + info.getRunningTimeSinceCancellationNanos() + ); + case 12: + return new TaskInfo( + info.getTaskId(), + info.getType(), + info.getAction(), + info.getDescription(), + info.getStatus(), + info.getStartTime(), + info.getRunningTimeNanos(), + true, + true, + info.getParentTaskId(), + info.getHeaders(), + info.getResourceStats(), + info.getCancellationStartTime(), + info.getRunningTimeSinceCancellationNanos() + between(1, 100) + ); default: throw new IllegalStateException(); } @@ -285,6 +319,12 @@ static TaskInfo randomTaskInfo(boolean detailed) { long runningTimeNanos = randomLong(); boolean cancellable = randomBoolean(); boolean cancelled = cancellable == true ? randomBoolean() : false; + long cancellationStartTime = -1; + long runningTimeSinceCancellationNanos = -1; + if (cancelled) { + cancellationStartTime = randomLong(); + runningTimeSinceCancellationNanos = randomLong(); + } TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId(); Map headers = randomBoolean() ? Collections.emptyMap() @@ -301,7 +341,9 @@ static TaskInfo randomTaskInfo(boolean detailed) { cancelled, parentTaskId, headers, - randomResourceStats(detailed) + randomResourceStats(detailed), + cancellationStartTime, + runningTimeSinceCancellationNanos ); } @@ -312,7 +354,7 @@ private static TaskId randomTaskId() { private static RawTaskStatus randomRawTaskStatus() { try (XContentBuilder builder = XContentBuilder.builder(Requests.INDEX_CONTENT_TYPE.xContent())) { builder.startObject(); - int fields = between(0, 10); + int fields = between(0, 12); for (int f = 0; f < fields; f++) { builder.field(randomAlphaOfLength(5), randomAlphaOfLength(5)); }