Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][misc] PIP-264: Add OpenTelemetry topic lookup metrics #11

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a6cef35
Add Topic lookup Metrics
dragosvictor Feb 13, 2024
4d2aca1
Merge remote-tracking branch 'origin/master' into pip-264-metric-conv…
dragosvictor Feb 13, 2024
0011515
Fix conflict
dragosvictor Feb 13, 2024
5ea2aee
Add javadoc
dragosvictor Feb 13, 2024
1bb795c
Add test infra
dragosvictor Feb 13, 2024
291c7a0
Add TopicLookupMetricsTest
dragosvictor Feb 14, 2024
c065850
Add more tests
dragosvictor Feb 14, 2024
20c41c4
Update tests
dragosvictor Feb 14, 2024
03fd6af
Move some tests to BrokerServiceLookupTest
dragosvictor Feb 15, 2024
1f62a39
Validate metric pulsar.broker.lookup.answer
dragosvictor Feb 15, 2024
a954255
Validate metric pulsar.broker.lookup.latency
dragosvictor Feb 15, 2024
deb39b1
Validate metric pulsar.broker.lookup.pending.request
dragosvictor Feb 15, 2024
17f98da
Validate metric pulsar.broker.topic.load.pending.request
dragosvictor Feb 15, 2024
b0b0dc3
Remove redundant TopicLookupMetricsTest
dragosvictor Feb 15, 2024
94d27fd
Use seconds unit for latency histograms
dragosvictor Feb 15, 2024
118034c
Revert redundant test changes in BrokerServiceThrottlingTest
dragosvictor Feb 15, 2024
02522d3
Cosmetic fixes
dragosvictor Feb 15, 2024
8eef496
Add PulsarDeprecatedMetric annotation
dragosvictor Feb 16, 2024
301603f
Add metric utility class
dragosvictor Feb 16, 2024
a92d77f
Add limit metrics for throttling semaphores
dragosvictor Feb 17, 2024
1e9bc12
Test fixes
dragosvictor Feb 17, 2024
1b1be93
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Feb 20, 2024
44933ce
Clarify decision to implement MetricsUtil.convertToSeconds
dragosvictor Feb 20, 2024
3cedfc0
Update metric types
dragosvictor Feb 20, 2024
102a230
Improve test readability
dragosvictor Feb 20, 2024
517335c
Use LongUpDownCounter for pending ops usage counters
dragosvictor Feb 20, 2024
29f42f6
Rename metrics pulsar.broker.topic.load.operation.pending.[usage,limit]
dragosvictor Feb 21, 2024
3141eb6
Add unit to pulsar.broker.topic.load.operation.pending.*
dragosvictor Feb 21, 2024
91f9ec8
Use duration histogram for pulsar broker lookup response metrics
dragosvictor Feb 21, 2024
4cd1c42
Cleanup debug statement
dragosvictor Feb 21, 2024
7ec86cb
Fix test build
dragosvictor Feb 21, 2024
c9eb651
Refactor BrokerServiceLookupTest#testMultipleBrokerLookup
dragosvictor Feb 21, 2024
4d9fc00
Refactor PulsarService constructor
dragosvictor Feb 21, 2024
c0bc9c5
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Feb 22, 2024
a99b819
Update topic lookup pending ops metric name
dragosvictor Feb 22, 2024
34dd00d
Rename pendingTopicLoadOperations fields to match metric names
dragosvictor Feb 22, 2024
8810f7f
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Feb 27, 2024
7ae864c
Move MetricsUtil class to pulsar-common
dragosvictor Feb 27, 2024
301f4f9
Use UpDownCounter for limit metrics
dragosvictor Feb 29, 2024
73b67dd
Rename metrics
dragosvictor Mar 5, 2024
a014fb2
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Mar 5, 2024
229a7d2
Fix build
dragosvictor Mar 5, 2024
05cdde0
Rename metrics
dragosvictor Mar 5, 2024
0016a6d
Use failure response attribute
dragosvictor Mar 5, 2024
4ef6988
Rename attribute to pulsar.lookup.response
dragosvictor Mar 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -249,7 +250,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final Timer brokerClientSharedTimer;

private MetricsGenerator metricsGenerator;
private PulsarBrokerOpenTelemetry openTelemetry;
private final PulsarBrokerOpenTelemetry openTelemetry;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -305,13 +306,23 @@ public PulsarService(ServiceConfiguration config,
WorkerConfig workerConfig,
Optional<WorkerService> functionWorkerService,
Consumer<Integer> processTerminator) {
this(config, workerConfig, functionWorkerService, processTerminator, null);
}

public PulsarService(ServiceConfiguration config,
WorkerConfig workerConfig,
Optional<WorkerService> functionWorkerService,
Consumer<Integer> processTerminator,
Consumer<AutoConfiguredOpenTelemetrySdkBuilder> openTelemetrySdkBuilderCustomizer) {
state = State.Init;

// Validate correctness of configuration
PulsarConfigurationLoader.isComplete(config);
TransactionBatchedWriteValidator.validate(config);
this.config = config;

this.openTelemetry = new PulsarBrokerOpenTelemetry(config, openTelemetrySdkBuilderCustomizer);

// validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);

Expand Down Expand Up @@ -902,7 +913,6 @@ public void start() throws PulsarServerException {
}

this.metricsGenerator = new MetricsGenerator(this);
this.openTelemetry = new PulsarBrokerOpenTelemetry(config);

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -97,10 +100,12 @@
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.policies.impl.NamespaceIsolationPolicies;
import org.apache.pulsar.common.stats.MetricsUtil;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.slf4j.Logger;
Expand Down Expand Up @@ -146,18 +151,37 @@ public class NamespaceService implements AutoCloseable {

private final RedirectManager redirectManager;


public static final String LOOKUP_REQUEST_DURATION_METRIC_NAME = "pulsar.broker.request.topic.lookup.duration";

private static final AttributeKey<String> PULSAR_LOOKUP_RESPONSE_ATTRIBUTE =
AttributeKey.stringKey("pulsar.lookup.response");
public static final Attributes PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES = Attributes.builder()
.put(PULSAR_LOOKUP_RESPONSE_ATTRIBUTE, "broker")
.build();
public static final Attributes PULSAR_LOOKUP_RESPONSE_REDIRECT_ATTRIBUTES = Attributes.builder()
.put(PULSAR_LOOKUP_RESPONSE_ATTRIBUTE, "redirect")
.build();
public static final Attributes PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES = Attributes.builder()
.put(PULSAR_LOOKUP_RESPONSE_ATTRIBUTE, "failure")
.build();

@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupRedirects = Counter.build("pulsar_broker_lookup_redirects", "-").register();

@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupFailures = Counter.build("pulsar_broker_lookup_failures", "-").register();

@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Counter lookupAnswers = Counter.build("pulsar_broker_lookup_answers", "-").register();

@PulsarDeprecatedMetric(newMetricName = LOOKUP_REQUEST_DURATION_METRIC_NAME)
private static final Summary lookupLatency = Summary.build("pulsar_broker_lookup", "-")
.quantile(0.50)
.quantile(0.99)
.quantile(0.999)
.quantile(1.0)
.register();

private final DoubleHistogram lookupLatencyHistogram;

/**
* Default constructor.
Expand All @@ -175,6 +199,12 @@ public NamespaceService(PulsarService pulsar) {
this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.redirectManager = new RedirectManager(pulsar);

this.lookupLatencyHistogram = pulsar.getOpenTelemetry().getMeter()
.histogramBuilder(LOOKUP_REQUEST_DURATION_METRIC_NAME)
.setDescription("The duration of topic lookup requests (either binary or HTTP)")
.setUnit("s")
.build();
}

public void initialize() {
Expand Down Expand Up @@ -204,18 +234,28 @@ public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicN
});
});

future.thenAccept(optResult -> {
lookupLatency.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
if (optResult.isPresent()) {
if (optResult.get().isRedirect()) {
lookupRedirects.inc();
future.whenComplete((lookupResult, throwable) -> {
var latencyNs = System.nanoTime() - startTime;
lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS);
Attributes attributes;
if (throwable == null) {
if (lookupResult.isPresent()) {
if (lookupResult.get().isRedirect()) {
lookupRedirects.inc();
attributes = PULSAR_LOOKUP_RESPONSE_REDIRECT_ATTRIBUTES;
} else {
lookupAnswers.inc();
attributes = PULSAR_LOOKUP_RESPONSE_BROKER_ATTRIBUTES;
}
} else {
lookupAnswers.inc();
// No lookup result, default to reporting as failure.
attributes = PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES;
}
} else {
lookupFailures.inc();
attributes = PULSAR_LOOKUP_RESPONSE_FAILURE_ATTRIBUTES;
}
}).exceptionally(ex -> {
lookupFailures.inc();
return null;
lookupLatencyHistogram.record(MetricsUtil.convertToSeconds(latencyNs, TimeUnit.NANOSECONDS), attributes);
});

return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.prometheus.client.Histogram;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -179,6 +180,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.slf4j.Logger;
Expand Down Expand Up @@ -241,8 +243,19 @@ public class BrokerService implements Closeable {
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;

public static final String TOPIC_LOOKUP_USAGE_METRIC_NAME = "pulsar.broker.request.topic.lookup.concurrent.usage";
public static final String TOPIC_LOOKUP_LIMIT_METRIC_NAME = "pulsar.broker.request.topic.lookup.concurrent.limit";
@PulsarDeprecatedMetric(newMetricName = TOPIC_LOOKUP_USAGE_METRIC_NAME)
private final ObserverGauge pendingLookupRequests;
private final ObservableLongUpDownCounter pendingLookupOperationsCounter;
private final ObservableLongUpDownCounter pendingLookupOperationsLimitCounter;

public static final String TOPIC_LOAD_USAGE_METRIC_NAME = "pulsar.broker.topic.load.concurrent.usage";
public static final String TOPIC_LOAD_LIMIT_METRIC_NAME = "pulsar.broker.topic.load.concurrent.limit";
@PulsarDeprecatedMetric(newMetricName = TOPIC_LOAD_USAGE_METRIC_NAME)
private final ObserverGauge pendingTopicLoadRequests;
private final ObservableLongUpDownCounter pendingTopicLoadOperationsCounter;
private final ObservableLongUpDownCounter pendingTopicLoadOperationsLimitCounter;

private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
Expand Down Expand Up @@ -346,7 +359,6 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges);
pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);


this.inactivityMonitor = OrderedScheduler.newSchedulerBuilder()
.name("pulsar-inactivity-monitor")
.numThreads(1)
Expand Down Expand Up @@ -374,9 +386,9 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.topicFactory = createPersistentTopicFactory();
// update dynamic configuration and register-listener
updateConfigurationAndRegisterListeners();
this.lookupRequestSemaphore = new AtomicReference<Semaphore>(
this.lookupRequestSemaphore = new AtomicReference<>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), false));
this.topicLoadRequestSemaphore = new AtomicReference<Semaphore>(
this.topicLoadRequestSemaphore = new AtomicReference<>(
new Semaphore(pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest(), false));
if (pulsar.getConfiguration().getMaxUnackedMessagesPerBroker() > 0
&& pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() > 0.0) {
Expand All @@ -403,15 +415,41 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.defaultServerBootstrap = defaultServerBootstrap();

this.pendingLookupRequests = ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
.supplier(() -> pulsar.getConfig().getMaxConcurrentLookupRequest()
- lookupRequestSemaphore.get().availablePermits())
.supplier(this::getPendingLookupRequest)
.register();
this.pendingLookupOperationsCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder(TOPIC_LOOKUP_USAGE_METRIC_NAME)
.setDescription("The number of pending lookup operations in the broker. "
+ "When it reaches threshold \"maxConcurrentLookupRequest\" defined in broker.conf, "
+ "new requests are rejected.")
.setUnit("{operation}")
.buildWithCallback(measurement -> measurement.record(getPendingLookupRequest()));
this.pendingLookupOperationsLimitCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder(TOPIC_LOOKUP_LIMIT_METRIC_NAME)
.setDescription("The maximum number of pending lookup operations in the broker. "
+ "Equal to \"maxConcurrentLookupRequest\" defined in broker.conf.")
.setUnit("{operation}")
.buildWithCallback(
measurement -> measurement.record(pulsar.getConfig().getMaxConcurrentLookupRequest()));

this.pendingTopicLoadRequests = ObserverGauge.build(
"pulsar_broker_topic_load_pending_requests", "-")
.supplier(() -> pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
- topicLoadRequestSemaphore.get().availablePermits())
"pulsar_broker_topic_load_pending_requests", "-")
.supplier(this::getPendingTopicLoadRequests)
.register();
this.pendingTopicLoadOperationsCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder(TOPIC_LOAD_USAGE_METRIC_NAME)
.setDescription("The number of pending topic load operations in the broker. "
+ "When it reaches threshold \"maxConcurrentTopicLoadRequest\" defined in broker.conf, "
+ "new requests are rejected.")
.setUnit("{operation}")
.buildWithCallback(measurement -> measurement.record(getPendingTopicLoadRequests()));
this.pendingTopicLoadOperationsLimitCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder(TOPIC_LOAD_LIMIT_METRIC_NAME)
.setDescription("The maximum number of pending topic load operations in the broker. "
+ "Equal to \"maxConcurrentTopicLoadRequest\" defined in broker.conf.")
.setUnit("{operation}")
.buildWithCallback(
measurement -> measurement.record(pulsar.getConfig().getMaxConcurrentTopicLoadRequest()));

this.brokerEntryMetadataInterceptors = BrokerEntryMetadataUtils
.loadBrokerEntryMetadataInterceptors(pulsar.getConfiguration().getBrokerEntryMetadataInterceptors(),
Expand All @@ -423,6 +461,15 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.bundlesQuotas = new BundlesQuotas(pulsar);
}

private int getPendingLookupRequest() {
return pulsar.getConfig().getMaxConcurrentLookupRequest() - lookupRequestSemaphore.get().availablePermits();
}

private int getPendingTopicLoadRequests() {
return pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
- topicLoadRequestSemaphore.get().availablePermits();
}

public void addTopicEventListener(TopicEventsListener... listeners) {
topicEventsDispatcher.addTopicEventListener(listeners);
getTopics().keys().forEach(topic ->
Expand Down Expand Up @@ -780,6 +827,8 @@ public CompletableFuture<Void> closeAsync() {
log.warn("Error in closing authenticationService", e);
}
pulsarStats.close();
pendingTopicLoadOperationsCounter.close();
pendingLookupOperationsCounter.close();
try {
delayedDeliveryTrackerFactory.close();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.pulsar.broker.stats;

import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.Closeable;
import java.util.function.Consumer;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -33,11 +36,13 @@ public class PulsarBrokerOpenTelemetry implements Closeable {
@Getter
private final Meter meter;

public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
public PulsarBrokerOpenTelemetry(ServiceConfiguration config,
@VisibleForTesting Consumer<AutoConfiguredOpenTelemetrySdkBuilder> builderCustomizer) {
openTelemetryService = OpenTelemetryService.builder()
.clusterName(config.getClusterName())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.builderCustomizer(builderCustomizer)
.build();
meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,19 +448,27 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
return builder;
}

protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception {
return createAdditionalPulsarTestContext(conf, null);
}
/**
* This method can be used in test classes for creating additional PulsarTestContext instances
* that share the same mock ZooKeeper and BookKeeper instances as the main PulsarTestContext instance.
*
* @param conf the ServiceConfiguration instance to use
* @param builderCustomizer a consumer that can be used to customize the builder configuration
* @return the PulsarTestContext instance
* @throws Exception if an error occurs
*/
protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception {
return createPulsarTestContextBuilder(conf)
protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf,
Consumer<PulsarTestContext.Builder> builderCustomizer) throws Exception {
var builder = createPulsarTestContextBuilder(conf)
.reuseMockBookkeeperAndMetadataStores(pulsarTestContext)
.reuseSpyConfig(pulsarTestContext)
.build();
.reuseSpyConfig(pulsarTestContext);
if (builderCustomizer != null) {
builderCustomizer.accept(builder);
}
return builder.build();
}

protected void waitForZooKeeperWatchers() {
Expand Down
Loading
Loading