Skip to content

Commit

Permalink
Add support to fetch the full response of completed tasks from user_t…
Browse files Browse the repository at this point in the history
…asks endpoint. (#721)
  • Loading branch information
kun du committed May 22, 2019
1 parent d4638c5 commit 266623a
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public class ParameterUtils {
public static final String DISCARD_PARAM = "discard";
public static final String REPLICATION_FACTOR_PARAM = "replication_factor";
public static final String SKIP_RACK_AWARENESS_CHECK_PARAM = "skip_rack_awareness_check";
public static final String FETCH_COMPLETED_TASK_PARAM = "fetch_completed_task";
private static final int MAX_REASON_LENGTH = 50;

private static final Map<EndPoint, Set<String>> VALID_ENDPOINT_PARAM_NAMES;
Expand Down Expand Up @@ -240,6 +241,7 @@ public class ParameterUtils {
userTasks.add(ENTRIES_PARAM);
userTasks.add(ENDPOINTS_PARAM);
userTasks.add(TYPES_PARAM);
userTasks.add(FETCH_COMPLETED_TASK_PARAM);

Set<String> admin = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
admin.add(JSON_PARAM);
Expand Down Expand Up @@ -804,7 +806,9 @@ public static Set<String> clientIds(HttpServletRequest request) throws Unsupport
* Default: An empty set.
*/
public static Set<EndPoint> endPoints(HttpServletRequest request) throws UnsupportedEncodingException {
Set<String> parsedEndPoints = parseParamToStringSet(request, ENDPOINTS_PARAM);
Set<String> parsedEndPoints = parseParamToStringSet(request, ENDPOINTS_PARAM).stream()
.map(String::toUpperCase)
.collect(Collectors.toSet());

Set<EndPoint> endPoints = new HashSet<>();
for (EndPoint endPoint : EndPoint.cachedValues()) {
Expand Down Expand Up @@ -843,7 +847,7 @@ static DataFrom getDataFrom(HttpServletRequest request) {
}

/**
* Skip hard goal check in kafka_assigner mode,
* Skip hard goal check in kafka_assigner mode.
*/
static boolean skipHardGoalCheck(HttpServletRequest request) {
return isKafkaAssignerMode(request) || getBooleanParam(request, SKIP_HARD_GOAL_CHECK_PARAM, false);
Expand All @@ -865,6 +869,10 @@ static short replicationFactor(HttpServletRequest request) {
return Short.parseShort(request.getParameter(parameterString));
}

static boolean fetchCompletedTask(HttpServletRequest request) {
return getBooleanParam(request, FETCH_COMPLETED_TASK_PARAM, false);
}

public enum DataFrom {
VALID_WINDOWS, VALID_PARTITIONS
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Retrieve the recent user tasks.
* GET /kafkacruisecontrol/user_tasks?json=[true/false]&amp;user_task_ids=[Set-of-USER-TASK-IDS]&amp;client_ids=[Set-of-ClientIdentity]&amp;
* endpoints=[Set-of-{@link EndPoint}]&amp;types=[Set-of-{@link UserTaskManager.TaskState}]&amp;entries=[POSITIVE-INTEGER]
* &amp;fetch_completed_task=[true/false]
* </pre>
*/
public class UserTasksParameters extends AbstractParameters {
Expand All @@ -28,6 +29,7 @@ public class UserTasksParameters extends AbstractParameters {
private Set<EndPoint> _endPoints;
private Set<UserTaskManager.TaskState> _types;
private int _entries;
private boolean _fetchCompletedTask;

public UserTasksParameters(HttpServletRequest request, KafkaCruiseControlConfig config) {
super(request, config);
Expand All @@ -41,6 +43,7 @@ protected void initParameters() throws UnsupportedEncodingException {
_endPoints = ParameterUtils.endPoints(_request);
_types = ParameterUtils.types(_request);
_entries = ParameterUtils.entries(_request);
_fetchCompletedTask = ParameterUtils.fetchCompletedTask(_request);
}

public Set<UUID> userTaskIds() {
Expand All @@ -62,4 +65,8 @@ public Set<UserTaskManager.TaskState> types() {
public int entries() {
return _entries;
}

public boolean fetchCompletedTask() {
return _fetchCompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ public void discardIrrelevantResponse(CruiseControlParameters parameters) {
}
}
}

String cachedResponse() {
return _cachedResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand All @@ -35,6 +36,7 @@ public class UserTaskState extends AbstractCruiseControlResponse {
private static final String START_MS = "StartMs";
private static final String STATUS = "Status";
private static final String USER_TASKS = "userTasks";
private static final String ORIGINAL_RESPONSE = "originalResponse";
private final Map<UserTaskManager.TaskState, List<UserTaskManager.UserTaskInfo>> _userTasksByTaskState;

public UserTaskState(UserTaskManager userTaskManager, KafkaCruiseControlConfig config) {
Expand All @@ -44,10 +46,12 @@ public UserTaskState(UserTaskManager userTaskManager, KafkaCruiseControlConfig c
_userTasksByTaskState.put(UserTaskManager.TaskState.COMPLETED, userTaskManager.getCompletedUserTasks());
}

private String getJSONString(CruiseControlParameters parameters) {
private String getJSONString(UserTasksParameters parameters) {
List<Map<String, Object>> jsonUserTaskList = new ArrayList<>();
for (UserTaskManager.UserTaskInfo taskInfo : prepareResultList(parameters)) {
addJSONTask(jsonUserTaskList, taskInfo);
addJSONTask(jsonUserTaskList,
taskInfo,
parameters.fetchCompletedTask() && (taskInfo.state() != UserTaskManager.TaskState.ACTIVE));
}
Map<String, Object> jsonResponse = new HashMap<>();
jsonResponse.put(USER_TASKS, jsonUserTaskList);
Expand All @@ -56,8 +60,8 @@ private String getJSONString(CruiseControlParameters parameters) {
}

// Also used for testing
public List<UserTaskManager.UserTaskInfo> prepareResultList(CruiseControlParameters parameters) {
int entries = ((UserTasksParameters) parameters).entries();
public List<UserTaskManager.UserTaskInfo> prepareResultList(UserTasksParameters parameters) {
int entries = parameters.entries();
// If entries argument isn't given in request, we give MAX_VALUE to entries. Thus need to avoid instantiating
// to MAX_VALUE
List<UserTaskManager.UserTaskInfo> resultList = (entries == Integer.MAX_VALUE ? new ArrayList<>() : new ArrayList<>(entries));
Expand All @@ -69,14 +73,19 @@ public List<UserTaskManager.UserTaskInfo> prepareResultList(CruiseControlParamet
}

private void addJSONTask(List<Map<String, Object>> jsonUserTaskList,
UserTaskManager.UserTaskInfo userTaskInfo) {
Map<String, Object> jsonObjectMap = new HashMap<>();
UserTaskManager.UserTaskInfo userTaskInfo,
boolean fetchCompletedTask) {
Map<String, Object> jsonObjectMap = new HashMap<>(fetchCompletedTask ? 6 : 5);
String status = userTaskInfo.state().toString();
jsonObjectMap.put(USER_TASK_ID, userTaskInfo.userTaskId().toString());
jsonObjectMap.put(REQUEST_URL, userTaskInfo.requestWithParams());
jsonObjectMap.put(CLIENT_ID, userTaskInfo.clientIdentity());
jsonObjectMap.put(START_MS, Long.toString(userTaskInfo.startMs()));
jsonObjectMap.put(STATUS, status);
// Populate original response of completed task if requested so.
if (fetchCompletedTask) {
jsonObjectMap.put(ORIGINAL_RESPONSE, completedTaskResponse(userTaskInfo));
}
jsonUserTaskList.add(jsonObjectMap);
}

Expand All @@ -95,15 +104,15 @@ private <T> Predicate<UserTaskManager.UserTaskInfo> checkInputFilter(Set<T> set)
*/
private void populateFilteredTasks(List<UserTaskManager.UserTaskInfo> filteredTasks,
List<UserTaskManager.UserTaskInfo> userTasks,
CruiseControlParameters parameters,
UserTasksParameters parameters,
int entries) {
if (filteredTasks.size() >= entries) {
return;
}
Set<UUID> requestedUserTaskIds = ((UserTasksParameters) parameters).userTaskIds();
Set<UserTaskManager.TaskState> requestedTaskStates = ((UserTasksParameters) parameters).types();
Set<EndPoint> requestedEndPoints = ((UserTasksParameters) parameters).endPoints();
Set<String> requestedClientIds = ((UserTasksParameters) parameters).clientIds();
Set<UUID> requestedUserTaskIds = parameters.userTaskIds();
Set<UserTaskManager.TaskState> requestedTaskStates = parameters.types();
Set<EndPoint> requestedEndPoints = parameters.endPoints();
Set<String> requestedClientIds = parameters.clientIds();

Consumer<UserTaskManager.UserTaskInfo> consumer = (elem) -> {
if (filteredTasks.size() < entries) {
Expand All @@ -121,7 +130,7 @@ private void populateFilteredTasks(List<UserTaskManager.UserTaskInfo> filteredTa
.forEach(consumer);
}

private String getPlaintext(CruiseControlParameters parameters) {
private String getPlaintext(UserTasksParameters parameters) {
StringBuilder sb = new StringBuilder();
int padding = 2;
int userTaskIdLabelSize = 20;
Expand Down Expand Up @@ -156,19 +165,43 @@ private String getPlaintext(CruiseControlParameters parameters) {

sb.append(String.format(formattingStringBuilder.toString(), "USER TASK ID", "CLIENT ADDRESS", "START TIME", "STATUS",
"REQUEST URL")); // header
for (UserTaskManager.UserTaskInfo userTaskInfo : prepareResultList(parameters)) {
List<UserTaskManager.UserTaskInfo> taskInfoList = prepareResultList(parameters);
for (UserTaskManager.UserTaskInfo userTaskInfo : taskInfoList) {
String dateFormatted = KafkaCruiseControlUtils.toDateString(userTaskInfo.startMs(), DATE_FORMAT, TIME_ZONE);
sb.append(String.format(formattingStringBuilder.toString(), userTaskInfo.userTaskId().toString(), userTaskInfo.clientIdentity(),
dateFormatted, userTaskInfo.state(), userTaskInfo.requestWithParams())); // values
}

// Populate original response of completed tasks if requested so.
if (parameters.fetchCompletedTask()) {
for (UserTaskManager.UserTaskInfo userTaskInfo : taskInfoList) {
if (userTaskInfo.state() == UserTaskManager.TaskState.ACTIVE) {
continue;
}
sb.append("\nOriginal response for task ")
.append(userTaskInfo.userTaskId())
.append(":\n")
.append(completedTaskResponse(userTaskInfo));
}
}

return sb.toString();
}

private String completedTaskResponse(UserTaskManager.UserTaskInfo userTaskInfo) {
try {
CruiseControlResponse response = userTaskInfo.futures().get(userTaskInfo.futures().size() - 1).get();
return ((AbstractCruiseControlResponse) response).cachedResponse();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("Error happened in fetching response for task " + userTaskInfo.userTaskId().toString(), e);
}
}

@Override
protected void discardIrrelevantAndCacheRelevant(CruiseControlParameters parameters) {
// Cache relevant response.
_cachedResponse = parameters.json() ? getJSONString(parameters) : getPlaintext(parameters);
UserTasksParameters userTasksParameters = (UserTasksParameters) parameters;
_cachedResponse = userTasksParameters.json() ? getJSONString(userTasksParameters) :
getPlaintext(userTasksParameters);
// Discard irrelevant response.
_userTasksByTaskState.get(UserTaskManager.TaskState.ACTIVE).clear();
_userTasksByTaskState.get(UserTaskManager.TaskState.COMPLETED).clear();
Expand Down

0 comments on commit 266623a

Please sign in to comment.