Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. #16421

Merged
merged 3 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938))
- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289))
- Add support for restoring from snapshot with search replicas ([#16111](https://github.com/opensearch-project/OpenSearch/pull/16111))
- Add logic in master service to optimize performance and retain detailed logging for critical cluster operations. ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
- Add Setting to adjust the primary constraint weights ([#16471](https://github.com/opensearch-project/OpenSearch/pull/16471))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,33 +299,37 @@ public static boolean assertNotMasterUpdateThread(String reason) {
}

private void runTasks(TaskInputs taskInputs) {
final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : "";
final String shortSummary = taskInputs.taskSummaryGenerator.apply(false);
final String summary;
if (logger.isTraceEnabled()) {
summary = taskInputs.taskSummaryGenerator.apply(true);
} else {
summary = taskInputs.taskSummaryGenerator.apply(false);
}

if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary);
logger.debug("processing [{}]: ignoring, cluster-manager service not started", summary);
return;
}

if (logger.isTraceEnabled()) {
logger.trace("executing cluster state update for [{}]", longSummary);
logger.trace("executing cluster state update for [{}]", summary);
} else {
logger.debug("executing cluster state update for [{}]", shortSummary);
logger.debug("executing cluster state update for [{}]", summary);
}

final ClusterState previousClusterState = state();

if (!previousClusterState.nodes().isLocalNodeElectedClusterManager() && taskInputs.runOnlyWhenClusterManager()) {
logger.debug("failing [{}]: local node is no longer cluster-manager", shortSummary);
logger.debug("failing [{}]: local node is no longer cluster-manager", summary);
taskInputs.onNoLongerClusterManager();
return;
}

final long computationStartTime = threadPool.preciseRelativeTimeInNanos();
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, shortSummary);
final TaskOutputs taskOutputs = calculateTaskOutputs(taskInputs, previousClusterState, summary);
taskOutputs.notifyFailedTasks();
final TimeValue computationTime = getTimeSince(computationStartTime);
logExecutionTime(computationTime, "compute cluster state update", shortSummary);
logExecutionTime(computationTime, "compute cluster state update", summary);

clusterManagerMetrics.recordLatency(
clusterManagerMetrics.clusterStateComputeHistogram,
Expand All @@ -337,25 +341,25 @@ private void runTasks(TaskInputs taskInputs) {
final long notificationStartTime = threadPool.preciseRelativeTimeInNanos();
taskOutputs.notifySuccessfulTasksOnUnchangedClusterState();
final TimeValue executionTime = getTimeSince(notificationStartTime);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", shortSummary);
logExecutionTime(executionTime, "notify listeners on unchanged cluster state", summary);
} else {
final ClusterState newClusterState = taskOutputs.newClusterState;
if (logger.isTraceEnabled()) {
logger.trace("cluster state updated, source [{}]\n{}", longSummary, newClusterState);
logger.trace("cluster state updated, source [{}]\n{}", summary, newClusterState);
} else {
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), shortSummary);
logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), summary);
}
final long publicationStartTime = threadPool.preciseRelativeTimeInNanos();
try {
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(shortSummary, newClusterState, previousClusterState);
ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(summary, newClusterState, previousClusterState);
// new cluster state, notify all listeners
final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta();
if (nodesDelta.hasChanges() && logger.isInfoEnabled()) {
String nodesDeltaSummary = nodesDelta.shortSummary();
if (nodesDeltaSummary.length() > 0) {
logger.info(
"{}, term: {}, version: {}, delta: {}",
shortSummary,
summary,
newClusterState.term(),
newClusterState.version(),
nodesDeltaSummary
Expand All @@ -366,7 +370,7 @@ private void runTasks(TaskInputs taskInputs) {
logger.debug("publishing cluster state version [{}]", newClusterState.version());
publish(clusterChangedEvent, taskOutputs, publicationStartTime);
} catch (Exception e) {
handleException(shortSummary, publicationStartTime, newClusterState, e);
handleException(summary, publicationStartTime, newClusterState, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,25 +195,37 @@ void runIfNotProcessed(BatchedTask updateTask) {
if (toExecute.isEmpty() == false) {
Function<Boolean, String> taskSummaryGenerator = (longSummaryRequired) -> {
if (longSummaryRequired == null || !longSummaryRequired) {
return buildShortSummary(updateTask.batchingKey, toExecute.size());
final List<BatchedTask> sampleTasks = toExecute.stream()
.limit(Math.min(1000, toExecute.size()))
sumitasr marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toList());
return buildShortSummary(updateTask.batchingKey, toExecute.size(), getSummary(updateTask, sampleTasks));
}
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
return getSummary(updateTask, toExecute);
};
taskBatcherListener.onBeginProcessing(toExecute);
run(updateTask.batchingKey, toExecute, taskSummaryGenerator);
}
}
}

private String buildShortSummary(final Object batchingKey, final int taskCount) {
return "Tasks batched with key: " + batchingKey.toString().split("\\$")[0] + " and count: " + taskCount;
private String getSummary(final BatchedTask updateTask, final List<BatchedTask> toExecute) {
final Map<String, List<BatchedTask>> processTasksBySource = new HashMap<>();
for (final BatchedTask task : toExecute) {
processTasksBySource.computeIfAbsent(task.source, s -> new ArrayList<>()).add(task);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
}

private String buildShortSummary(final Object batchingKey, final int taskCount, final String sampleTasks) {
return "Tasks batched with key: "
+ batchingKey.toString().split("\\$")[0]
+ ", count:"
+ taskCount
+ " and sample tasks: "
+ sampleTasks;
}

/**
Expand Down
Loading
Loading