Skip to content

Commit

Permalink
Metrics for each of the RC Actor messages (#576)
Browse files Browse the repository at this point in the history
* Metrics for each of the RC Actor messages

* Address andy comments

---------

Co-authored-by: Sundaram Ananthanarayanan <sananthanarayanan@netflix.com>
  • Loading branch information
sundargates and sundargates authored Nov 2, 2023
1 parent fcfb1e4 commit 7d098a8
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.netflix.spectator.api.TagList;
import io.mantisrx.common.Ack;
import io.mantisrx.common.WorkerConstants;
import io.mantisrx.master.resourcecluster.metrics.ResourceClusterActorMetrics;
import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesRequest;
import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesResponse;
import io.mantisrx.server.core.CacheJobArtifactsRequest;
Expand Down Expand Up @@ -191,35 +190,39 @@ public Receive createReceive() {
.match(GetActiveJobsRequest.class, this::getActiveJobs)
.match(GetTaskExecutorStatusRequest.class, this::getTaskExecutorStatus)
.match(GetClusterUsageRequest.class,
req -> sender().tell(this.executorStateManager.getClusterUsage(req), self()))
metrics.withTracking(req ->
sender().tell(this.executorStateManager.getClusterUsage(req), self())))
.match(GetClusterIdleInstancesRequest.class,
req -> sender().tell(onGetClusterIdleInstancesRequest(req), self()))
metrics.withTracking(req ->
sender().tell(onGetClusterIdleInstancesRequest(req), self())))
.match(GetAssignedTaskExecutorRequest.class, this::onAssignedTaskExecutorRequest)
.match(Ack.class, ack -> log.info("Received ack from {}", sender()))

.match(TaskExecutorAssignmentTimeout.class, this::onTaskExecutorAssignmentTimeout)
.match(TaskExecutorRegistration.class, this::onTaskExecutorRegistration)
.match(InitializeTaskExecutorRequest.class, this::onTaskExecutorInitialization)
.match(TaskExecutorHeartbeat.class, this::onHeartbeat)
.match(TaskExecutorRegistration.class, metrics.withTracking(this::onTaskExecutorRegistration))
.match(InitializeTaskExecutorRequest.class, metrics.withTracking(this::onTaskExecutorInitialization))
.match(TaskExecutorHeartbeat.class, metrics.withTracking(this::onHeartbeat))
.match(TaskExecutorStatusChange.class, this::onTaskExecutorStatusChange)
.match(TaskExecutorDisconnection.class, this::onTaskExecutorDisconnection)
.match(HeartbeatTimeout.class, this::onTaskExecutorHeartbeatTimeout)
.match(TaskExecutorAssignmentRequest.class, this::onTaskExecutorAssignmentRequest)
.match(TaskExecutorDisconnection.class, metrics.withTracking(this::onTaskExecutorDisconnection))
.match(HeartbeatTimeout.class, metrics.withTracking(this::onTaskExecutorHeartbeatTimeout))
.match(TaskExecutorAssignmentRequest.class, metrics.withTracking(this::onTaskExecutorAssignmentRequest))
.match(ResourceOverviewRequest.class, this::onResourceOverviewRequest)
.match(TaskExecutorInfoRequest.class, this::onTaskExecutorInfoRequest)
.match(TaskExecutorGatewayRequest.class, this::onTaskExecutorGatewayRequest)
.match(TaskExecutorGatewayRequest.class, metrics.withTracking(this::onTaskExecutorGatewayRequest))
.match(DisableTaskExecutorsRequest.class, this::onNewDisableTaskExecutorsRequest)
.match(CheckDisabledTaskExecutors.class, this::findAndMarkDisabledTaskExecutors)
.match(ExpireDisableTaskExecutorsRequest.class, this::onDisableTaskExecutorsRequestExpiry)
.match(GetTaskExecutorWorkerMappingRequest.class, req -> sender().tell(getTaskExecutorWorkerMapping(req.getAttributes()), self()))
.match(PublishResourceOverviewMetricsRequest.class, this::onPublishResourceOverviewMetricsRequest)
.match(CacheJobArtifactsOnTaskExecutorRequest.class, this::onCacheJobArtifactsOnTaskExecutorRequest)
.match(CacheJobArtifactsOnTaskExecutorRequest.class, metrics.withTracking(this::onCacheJobArtifactsOnTaskExecutorRequest))
.match(AddNewJobArtifactsToCacheRequest.class, this::onAddNewJobArtifactsToCacheRequest)
.match(RemoveJobArtifactsToCacheRequest.class, this::onRemoveJobArtifactsToCacheRequest)
.match(GetJobArtifactsToCacheRequest.class, req -> sender().tell(new ArtifactList(new ArrayList<>(jobArtifactsToCache)), self()))
.build();
}



private void onAddNewJobArtifactsToCacheRequest(AddNewJobArtifactsToCacheRequest req) {
try {
Set<ArtifactID> newArtifacts = new HashSet<>(req.artifacts);
Expand Down Expand Up @@ -813,7 +816,7 @@ private Set<String> getJobClustersWithArtifactCachingEnabled() {
}

@Value
private static class HeartbeatTimeout {
static class HeartbeatTimeout {

TaskExecutorID taskExecutorID;
Instant lastActivity;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2023 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.mantisrx.master.resourcecluster;

import akka.japi.pf.FI;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.api.Timer;
import io.mantisrx.common.metrics.spectator.MetricId;
import io.mantisrx.common.metrics.spectator.SpectatorRegistryFactory;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.CacheJobArtifactsOnTaskExecutorRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetClusterUsageRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.HeartbeatTimeout;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.InitializeTaskExecutorRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest;
import io.mantisrx.master.resourcecluster.proto.GetClusterIdleInstancesRequest;
import io.mantisrx.server.master.resourcecluster.TaskExecutorDisconnection;
import io.mantisrx.server.master.resourcecluster.TaskExecutorHeartbeat;
import io.mantisrx.server.master.resourcecluster.TaskExecutorRegistration;
import io.mantisrx.shaded.com.google.common.collect.ImmutableMap;
import io.vavr.Tuple2;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

@Slf4j
class ResourceClusterActorMetrics {

private static final String METRIC_GROUP_ID = "ResourceClusterActor";

public static final String NUM_REGISTERED_TE = "numRegisteredTaskExecutors";
public static final String NUM_BUSY_TE = "numBusyTaskExecutors";
public static final String NUM_AVAILABLE_TE = "numAvailableTaskExecutors";
public static final String NUM_DISABLED_TE = "numDisabledTaskExecutors";
public static final String NUM_UNREGISTERED_TE = "numUnregisteredTaskExecutors";
public static final String NUM_ASSIGNED_TE = "numAssignedTaskExecutors";
public static final String NO_RESOURCES_AVAILABLE = "noResourcesAvailable";
public static final String HEARTBEAT_TIMEOUT = "taskExecutorHeartbeatTimeout";

public static final String TE_CONNECTION_FAILURE = "taskExecutorConnectionFailure";
public static final String MAX_JOB_ARTIFACTS_TO_CACHE_REACHED = "maxJobArtifactsToCacheReached";

private final Registry registry;
private final Map<Class<?>, Tuple2<Counter, Timer>> messageMetrics;
private final Tuple2<Counter, Timer> unknownMessageMetrics;

private Id getMessageReceivedId(String messageName) {
return new MetricId(METRIC_GROUP_ID, "messagesReceived",
Tag.of("messageType", messageName)).getSpectatorId(registry);
}

private Id getMessageProcessingLatencyId(String messageName) {
return new MetricId(METRIC_GROUP_ID, "messageProcessingLatency",
Tag.of("messageType", messageName)).getSpectatorId(registry);
}

private Tuple2<Counter, Timer> getBoth(String messageName) {
return new Tuple2<>(
registry.counter(getMessageReceivedId(messageName)),
registry.timer(getMessageProcessingLatencyId(messageName)));
}

public ResourceClusterActorMetrics() {
this.registry = SpectatorRegistryFactory.getRegistry();
this.messageMetrics = ImmutableMap.of(
TaskExecutorRegistration.class, getBoth("TaskExecutorRegistration"),
InitializeTaskExecutorRequest.class, getBoth("InitializeTaskExecutorRequest"),
TaskExecutorHeartbeat.class, getBoth("TaskExecutorHeartbeat"),
TaskExecutorDisconnection.class, getBoth("TaskExecutorDisconnection"),
HeartbeatTimeout.class, getBoth("HeartbeatTimeout"),
TaskExecutorAssignmentRequest.class, getBoth("TaskExecutorAssignmentRequest"),
TaskExecutorGatewayRequest.class, getBoth("TaskExecutorGatewayRequest"),
CacheJobArtifactsOnTaskExecutorRequest.class,
getBoth("CacheJobArtifactsOnTaskExecutorRequest"),
GetClusterUsageRequest.class, getBoth("GetClusterUsageRequest"),
GetClusterIdleInstancesRequest.class, getBoth("GetClusterIdleInstancesRequest")
);
this.unknownMessageMetrics = getBoth("UnknownMessage");
}

public void setGauge(final String metric, final long value, final Iterable<Tag> tags) {
registry.gauge(new MetricId(METRIC_GROUP_ID, metric, tags).getSpectatorId(registry))
.set(value);
}

public void incrementCounter(final String metric, final Iterable<Tag> tags) {
registry.counter(new MetricId(METRIC_GROUP_ID, metric, tags).getSpectatorId(registry))
.increment();
}

public <P> FI.UnitApply<P> withTracking(final FI.UnitApply<P> apply) {
return p -> {
final long start = System.nanoTime();
try {
apply.apply(p);
} finally {
final Class<?> pClass = p.getClass();
messageMetrics.getOrDefault(pClass, unknownMessageMetrics)
.apply((counter, timer) -> {
counter.increment();
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
return null;
});
}
};
}
}

This file was deleted.

0 comments on commit 7d098a8

Please sign in to comment.