Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][misc] PIP-264: Implement topic lookup metrics using OpenTelemetry #22058

Merged
merged 45 commits into from
Mar 6, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
a6cef35
Add Topic lookup Metrics
dragosvictor Feb 13, 2024
4d2aca1
Merge remote-tracking branch 'origin/master' into pip-264-metric-conv…
dragosvictor Feb 13, 2024
0011515
Fix conflict
dragosvictor Feb 13, 2024
5ea2aee
Add javadoc
dragosvictor Feb 13, 2024
1bb795c
Add test infra
dragosvictor Feb 13, 2024
291c7a0
Add TopicLookupMetricsTest
dragosvictor Feb 14, 2024
c065850
Add more tests
dragosvictor Feb 14, 2024
20c41c4
Update tests
dragosvictor Feb 14, 2024
03fd6af
Move some tests to BrokerServiceLookupTest
dragosvictor Feb 15, 2024
1f62a39
Validate metric pulsar.broker.lookup.answer
dragosvictor Feb 15, 2024
a954255
Validate metric pulsar.broker.lookup.latency
dragosvictor Feb 15, 2024
deb39b1
Validate metric pulsar.broker.lookup.pending.request
dragosvictor Feb 15, 2024
17f98da
Validate metric pulsar.broker.topic.load.pending.request
dragosvictor Feb 15, 2024
b0b0dc3
Remove redundant TopicLookupMetricsTest
dragosvictor Feb 15, 2024
94d27fd
Use seconds unit for latency histograms
dragosvictor Feb 15, 2024
118034c
Revert redundant test changes in BrokerServiceThrottlingTest
dragosvictor Feb 15, 2024
02522d3
Cosmetic fixes
dragosvictor Feb 15, 2024
8eef496
Add PulsarDeprecatedMetric annotation
dragosvictor Feb 16, 2024
301603f
Add metric utility class
dragosvictor Feb 16, 2024
a92d77f
Add limit metrics for throttling semaphores
dragosvictor Feb 17, 2024
1e9bc12
Test fixes
dragosvictor Feb 17, 2024
1b1be93
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Feb 20, 2024
44933ce
Clarify decision to implement MetricsUtil.convertToSeconds
dragosvictor Feb 20, 2024
3cedfc0
Update metric types
dragosvictor Feb 20, 2024
102a230
Improve test readability
dragosvictor Feb 20, 2024
517335c
Use LongUpDownCounter for pending ops usage counters
dragosvictor Feb 20, 2024
29f42f6
Rename metrics pulsar.broker.topic.load.operation.pending.[usage,limit]
dragosvictor Feb 21, 2024
3141eb6
Add unit to pulsar.broker.topic.load.operation.pending.*
dragosvictor Feb 21, 2024
91f9ec8
Use duration histogram for pulsar broker lookup response metrics
dragosvictor Feb 21, 2024
4cd1c42
Cleanup debug statement
dragosvictor Feb 21, 2024
7ec86cb
Fix test build
dragosvictor Feb 21, 2024
c9eb651
Refactor BrokerServiceLookupTest#testMultipleBrokerLookup
dragosvictor Feb 21, 2024
4d9fc00
Refactor PulsarService constructor
dragosvictor Feb 21, 2024
c0bc9c5
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Feb 22, 2024
a99b819
Update topic lookup pending ops metric name
dragosvictor Feb 22, 2024
34dd00d
Rename pendingTopicLoadOperations fields to match metric names
dragosvictor Feb 22, 2024
8810f7f
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Feb 27, 2024
7ae864c
Move MetricsUtil class to pulsar-common
dragosvictor Feb 27, 2024
301f4f9
Use UpDownCounter for limit metrics
dragosvictor Feb 29, 2024
73b67dd
Rename metrics
dragosvictor Mar 5, 2024
a014fb2
Merge remote-tracking branch 'origin/master' into pip-264-topic-looku…
dragosvictor Mar 5, 2024
229a7d2
Fix build
dragosvictor Mar 5, 2024
05cdde0
Rename metrics
dragosvictor Mar 5, 2024
0016a6d
Use failure response attribute
dragosvictor Mar 5, 2024
4ef6988
Rename attribute to pulsar.lookup.response
dragosvictor Mar 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pulsar-broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,11 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private final Timer brokerClientSharedTimer;

private MetricsGenerator metricsGenerator;
private PulsarBrokerOpenTelemetry openTelemetry;
@VisibleForTesting
protected PulsarBrokerOpenTelemetry openTelemetry;

private TransactionMetadataStoreService transactionMetadataStoreService;
private TransactionBufferProvider transactionBufferProvider;
Expand Down Expand Up @@ -312,6 +313,8 @@ public PulsarService(ServiceConfiguration config,
TransactionBatchedWriteValidator.validate(config);
this.config = config;

this.openTelemetry = new PulsarBrokerOpenTelemetry(config);
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved

// validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);

Expand Down Expand Up @@ -902,7 +905,6 @@ public void start() throws PulsarServerException {
}

this.metricsGenerator = new MetricsGenerator(this);
this.openTelemetry = new PulsarBrokerOpenTelemetry(config);

// Initialize the message protocol handlers.
// start the protocol handlers only after the broker is ready,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.LongCounter;
import io.prometheus.client.Counter;
import java.net.URI;
import java.net.URL;
Expand Down Expand Up @@ -146,18 +148,24 @@ public class NamespaceService implements AutoCloseable {

private final RedirectManager redirectManager;


/** @deprecated by {@link #lookupRedirectsCounter} */
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved
private static final Counter lookupRedirects = Counter.build("pulsar_broker_lookup_redirects", "-").register();
private final LongCounter lookupRedirectsCounter;
/** @deprecated by {@link #lookupFailuresCounter} */
private static final Counter lookupFailures = Counter.build("pulsar_broker_lookup_failures", "-").register();
private final LongCounter lookupFailuresCounter;
/** @deprecated by {@link #lookupAnswersCounter} */
private static final Counter lookupAnswers = Counter.build("pulsar_broker_lookup_answers", "-").register();
private final LongCounter lookupAnswersCounter;

/** @deprecated by {@link #lookupLatencyHistogram} */
private static final Summary lookupLatency = Summary.build("pulsar_broker_lookup", "-")
.quantile(0.50)
.quantile(0.99)
.quantile(0.999)
.quantile(1.0)
.register();

private final DoubleHistogram lookupLatencyHistogram;

/**
* Default constructor.
Expand All @@ -175,6 +183,25 @@ public NamespaceService(PulsarService pulsar) {
this.bundleSplitListeners = new CopyOnWriteArrayList<>();
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
this.redirectManager = new RedirectManager(pulsar);

var meter = pulsar.getOpenTelemetry().getMeter();
this.lookupRedirectsCounter = meter
.counterBuilder("pulsar.broker.lookup.redirect")
.setDescription("The number of lookup redirected requests")
.build();
this.lookupFailuresCounter = meter
.counterBuilder("pulsar.broker.lookup.failure")
.setDescription("The number of lookup failures")
.build();
this.lookupAnswersCounter = meter
.counterBuilder("pulsar.broker.lookup.answer")
.setDescription("The number of lookup responses (i.e. not redirected requests)")
.build();
this.lookupLatencyHistogram = meter
.histogramBuilder("pulsar.broker.lookup.latency")
asafm marked this conversation as resolved.
Show resolved Hide resolved
.setDescription("Lookup request latency")
.setUnit("s")
asafm marked this conversation as resolved.
Show resolved Hide resolved
.build();
}

public void initialize() {
Expand Down Expand Up @@ -205,16 +232,21 @@ public CompletableFuture<Optional<LookupResult>> getBrokerServiceUrlAsync(TopicN
});

future.thenAccept(optResult -> {
lookupLatency.observe(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
var latencyNs = System.nanoTime() - startTime;
lookupLatency.observe(latencyNs, TimeUnit.NANOSECONDS);
lookupLatencyHistogram.record(latencyNs / 1_000_000_000.0);
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved
if (optResult.isPresent()) {
if (optResult.get().isRedirect()) {
lookupRedirects.inc();
lookupRedirectsCounter.add(1);
} else {
lookupAnswers.inc();
lookupAnswersCounter.add(1);
}
}
}).exceptionally(ex -> {
lookupFailures.inc();
lookupFailuresCounter.add(1);
return null;
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.metrics.ObservableLongUpDownCounter;
import io.prometheus.client.Histogram;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -241,8 +242,13 @@ public class BrokerService implements Closeable {
protected final AtomicReference<Semaphore> lookupRequestSemaphore;
protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;

/** @deprecated by {@link #pendingLookupRequestsCounter} */
private final ObserverGauge pendingLookupRequests;
private final ObservableLongUpDownCounter pendingLookupRequestsCounter;

/** @deprecated by {@link #pendingTopicLoadRequestsCounter} */
private final ObserverGauge pendingTopicLoadRequests;
private final ObservableLongUpDownCounter pendingTopicLoadRequestsCounter;

private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
Expand Down Expand Up @@ -403,15 +409,25 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.defaultServerBootstrap = defaultServerBootstrap();

this.pendingLookupRequests = ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-")
.supplier(() -> pulsar.getConfig().getMaxConcurrentLookupRequest()
- lookupRequestSemaphore.get().availablePermits())
.supplier(this::getPendingLookupRequest)
.register();
this.pendingLookupRequestsCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder("pulsar.broker.lookup.pending.request")
.setDescription("The number of pending lookup requests in the broker. "
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved
+ "When it reaches threshold \"maxConcurrentLookupRequest\" defined in broker.conf, "
+ "new requests are rejected.")
.buildWithCallback(measurement -> measurement.record(getPendingLookupRequest()));

this.pendingTopicLoadRequests = ObserverGauge.build(
"pulsar_broker_topic_load_pending_requests", "-")
.supplier(() -> pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
- topicLoadRequestSemaphore.get().availablePermits())
.supplier(this::getPendingTopicLoadRequests)
.register();
this.pendingTopicLoadRequestsCounter = pulsar.getOpenTelemetry().getMeter()
.upDownCounterBuilder("pulsar.broker.topic.load.pending.request")
.setDescription("The number of pending topic load operations in the broker. "
+ "When it reaches threshold \"maxConcurrentTopicLoadRequest\" defined in broker.conf, "
+ "new requests are rejected.")
.buildWithCallback(measurement -> measurement.record(getPendingTopicLoadRequests()));

this.brokerEntryMetadataInterceptors = BrokerEntryMetadataUtils
.loadBrokerEntryMetadataInterceptors(pulsar.getConfiguration().getBrokerEntryMetadataInterceptors(),
Expand All @@ -423,6 +439,15 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.bundlesQuotas = new BundlesQuotas(pulsar);
}

private int getPendingLookupRequest() {
return pulsar.getConfig().getMaxConcurrentLookupRequest() - lookupRequestSemaphore.get().availablePermits();
}

private int getPendingTopicLoadRequests() {
return pulsar.getConfig().getMaxConcurrentTopicLoadRequest()
- topicLoadRequestSemaphore.get().availablePermits();
}

public void addTopicEventListener(TopicEventsListener... listeners) {
topicEventsDispatcher.addTopicEventListener(listeners);
getTopics().keys().forEach(topic ->
Expand Down Expand Up @@ -780,6 +805,8 @@ public CompletableFuture<Void> closeAsync() {
log.warn("Error in closing authenticationService", e);
}
pulsarStats.close();
pendingTopicLoadRequestsCounter.close();
pendingLookupRequestsCounter.close();
try {
delayedDeliveryTrackerFactory.close();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.pulsar.broker.stats;

import com.google.common.annotations.VisibleForTesting;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import java.io.Closeable;
import java.util.function.Consumer;
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand All @@ -34,10 +37,17 @@ public class PulsarBrokerOpenTelemetry implements Closeable {
private final Meter meter;

public PulsarBrokerOpenTelemetry(ServiceConfiguration config) {
this(config, null);
}

@VisibleForTesting
public PulsarBrokerOpenTelemetry(ServiceConfiguration config,
Consumer<AutoConfiguredOpenTelemetrySdkBuilder> builderCustomizer) {
openTelemetryService = OpenTelemetryService.builder()
.clusterName(config.getClusterName())
.serviceName(SERVICE_NAME)
.serviceVersion(PulsarVersion.getVersion())
.sdkBuilderConsumer(builderCustomizer)
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved
.build();
meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,19 +448,27 @@ protected PulsarTestContext.Builder createPulsarTestContextBuilder(ServiceConfig
return builder;
}

protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception {
return createAdditionalPulsarTestContext(conf, null);
}
/**
* This method can be used in test classes for creating additional PulsarTestContext instances
* that share the same mock ZooKeeper and BookKeeper instances as the main PulsarTestContext instance.
*
* @param conf the ServiceConfiguration instance to use
* @param builderCustomizer a consumer that can be used to customize the builder configuration
* @return the PulsarTestContext instance
* @throws Exception if an error occurs
*/
protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf) throws Exception {
return createPulsarTestContextBuilder(conf)
protected PulsarTestContext createAdditionalPulsarTestContext(ServiceConfiguration conf,
Consumer<PulsarTestContext.Builder> builderCustomizer) throws Exception {
var builder = createPulsarTestContextBuilder(conf)
.reuseMockBookkeeperAndMetadataStores(pulsarTestContext)
.reuseSpyConfig(pulsarTestContext)
.build();
.reuseSpyConfig(pulsarTestContext);
if (builderCustomizer != null) {
builderCustomizer.accept(builder);
}
return builder.build();
}

protected void waitForZooKeeperWatchers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -52,6 +54,7 @@
import org.apache.pulsar.broker.resources.TopicResources;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.common.util.GracefulExecutorServicesShutdown;
import org.apache.pulsar.common.util.PortManager;
Expand All @@ -64,6 +67,7 @@
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.MetadataStoreFactoryImpl;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.opentelemetry.OpenTelemetryService;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.MockZooKeeperSession;
Expand Down Expand Up @@ -160,6 +164,8 @@ public class PulsarTestContext implements AutoCloseable {

private final boolean preallocatePorts;

private final boolean enableOpenTelemetry;
private final InMemoryMetricReader openTelemetryMetricReader;

public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerClientFactory.getManagedLedgerFactory();
Expand Down Expand Up @@ -727,11 +733,24 @@ protected void initializePulsarServices(SpyConfig spyConfig, Builder builder) {
.equals(PulsarCompactionServiceFactory.class.getName())) {
compactionServiceFactory = new MockPulsarCompactionServiceFactory(spyConfig, builder.compactor);
}
PulsarBrokerOpenTelemetry pulsarBrokerOpenTelemetry;
if (builder.enableOpenTelemetry) {
var reader = InMemoryMetricReader.create();
pulsarBrokerOpenTelemetry = new PulsarBrokerOpenTelemetry(builder.config, builderCustomizer -> {
builderCustomizer.addMeterProviderCustomizer(
(meterProviderBuilder, __) -> meterProviderBuilder.registerMetricReader(reader));
builderCustomizer.addPropertiesSupplier(
() -> Map.of(OpenTelemetryService.OTEL_SDK_DISABLED_KEY, "false"));
});
openTelemetryMetricReader(reader);
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved
} else {
pulsarBrokerOpenTelemetry = null;
}
PulsarService pulsarService = spyConfig.getPulsarService()
.spy(StartableTestPulsarService.class, spyConfig, builder.config, builder.localMetadataStore,
builder.configurationMetadataStore, compactionServiceFactory,
builder.brokerInterceptor,
bookKeeperClientFactory, builder.brokerServiceCustomizer);
bookKeeperClientFactory, builder.brokerServiceCustomizer, pulsarBrokerOpenTelemetry);
if (compactionServiceFactory != null) {
compactionServiceFactory.initialize(pulsarService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.stats.PulsarBrokerOpenTelemetry;
import org.apache.pulsar.compaction.CompactionServiceFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;

Expand All @@ -44,10 +45,16 @@ public StartableTestPulsarService(SpyConfig spyConfig, ServiceConfiguration conf
CompactionServiceFactory compactionServiceFactory,
BrokerInterceptor brokerInterceptor,
BookKeeperClientFactory bookKeeperClientFactory,
Function<BrokerService, BrokerService> brokerServiceCustomizer) {
Function<BrokerService, BrokerService> brokerServiceCustomizer,
PulsarBrokerOpenTelemetry openTelemetry) {
super(spyConfig, config, localMetadataStore, configurationMetadataStore, compactionServiceFactory,
brokerInterceptor, bookKeeperClientFactory);
this.brokerServiceCustomizer = brokerServiceCustomizer;
if (openTelemetry != null) {
// Replace existing OpenTelemetry wrapper class.
this.openTelemetry.close();
dragosvictor marked this conversation as resolved.
Show resolved Hide resolved
this.openTelemetry = openTelemetry;
}
}

@Override
Expand All @@ -59,4 +66,4 @@ protected BrokerService newBrokerService(PulsarService pulsar) throws Exception
public Supplier<NamespaceService> getNamespaceServiceProvider() throws PulsarServerException {
return () -> spyConfig.getNamespaceService().spy(NamespaceService.class, this);
}
}
}
Loading
Loading