Skip to content

Commit

Permalink
[FLINK-26931][Connector/Pulsar] Make the producer name and consumer n…
Browse files Browse the repository at this point in the history
…ame unique for each instance. (#152)
  • Loading branch information
syhily authored Jul 20, 2022
1 parent 52a72d4 commit b829d73
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public static PulsarClient createClient(PulsarConfiguration configuration) {

/**
* PulsarAdmin shares almost the same configuration with PulsarClient, but we separate this
* create method for directly creating it.
* creating method for directly use it.
*/
public static PulsarAdmin createAdmin(PulsarConfiguration configuration) {
PulsarAdminBuilder builder = PulsarAdmin.builder();
Expand Down Expand Up @@ -200,15 +200,17 @@ private static Authentication createAuthentication(PulsarConfiguration configura
String authParamsString = configuration.get(PULSAR_AUTH_PARAMS);
return sneakyClient(
() -> AuthenticationFactory.create(authPluginClassName, authParamsString));
} else if (configuration.contains(PULSAR_AUTH_PARAM_MAP)) {
Map<String, String> paramsMap = configuration.get(PULSAR_AUTH_PARAM_MAP);
} else {
Map<String, String> paramsMap = configuration.getProperties(PULSAR_AUTH_PARAM_MAP);
if (paramsMap.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"No %s or %s provided",
PULSAR_AUTH_PARAMS.key(), PULSAR_AUTH_PARAM_MAP.key()));
}

return sneakyClient(
() -> AuthenticationFactory.create(authPluginClassName, paramsMap));
} else {
throw new IllegalArgumentException(
String.format(
"No %s or %s provided",
PULSAR_AUTH_PARAMS.key(), PULSAR_AUTH_PARAM_MAP.key()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public Map<String, String> getProperties(ConfigOption<Map<String, String>> optio
return properties;
}

/** Get an option value from the given config, convert it into the a new value instance. */
/** Get an option value from the given config, convert it into a new value instance. */
public <F, T> T get(ConfigOption<F> option, Function<F, T> convertor) {
F value = get(option);
if (value != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,11 @@ public PulsarSink<IN> build() {
if (!configBuilder.contains(PULSAR_PRODUCER_NAME)) {
LOG.warn(
"We recommend set a readable producer name through setProducerName(String) in production mode.");
} else {
String producerName = configBuilder.get(PULSAR_PRODUCER_NAME);
if (!producerName.contains("%s")) {
configBuilder.override(PULSAR_PRODUCER_NAME, producerName + " - %s");
}
}

checkNotNull(serializationSchema, "serializationSchema must be set.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pulsar.client.api.Schema;

import java.util.Map;
import java.util.UUID;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -77,7 +78,10 @@ public static <T> ProducerBuilder<T> createProducerBuilder(
PulsarClient client, Schema<T> schema, SinkConfiguration configuration) {
ProducerBuilder<T> builder = client.newProducer(schema);

configuration.useOption(PULSAR_PRODUCER_NAME, builder::producerName);
configuration.useOption(
PULSAR_PRODUCER_NAME,
producerName -> String.format(producerName, UUID.randomUUID()),
builder::producerName);
configuration.useOption(
PULSAR_SEND_TIMEOUT_MS,
Math::toIntExact,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,9 +515,14 @@ public PulsarSource<OUT> build() {
if (!configBuilder.contains(PULSAR_CONSUMER_NAME)) {
LOG.warn(
"We recommend set a readable consumer name through setConsumerName(String) in production mode.");
} else {
String consumerName = configBuilder.get(PULSAR_CONSUMER_NAME);
if (!consumerName.contains("%s")) {
configBuilder.override(PULSAR_CONSUMER_NAME, consumerName + " - %s");
}
}

// Since these implementation could be a lambda, make sure they are serializable.
// Since these implementations could be a lambda, make sure they are serializable.
checkState(isSerializable(startCursor), "StartCursor isn't serializable");
checkState(isSerializable(stopCursor), "StopCursor isn't serializable");
checkState(isSerializable(rangeGenerator), "RangeGenerator isn't serializable");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand Down Expand Up @@ -106,7 +107,10 @@ public static <T> ConsumerBuilder<T> createConsumerBuilder(
configuration.useOption(
PULSAR_MAX_TOTAL_RECEIVER_QUEUE_SIZE_ACROSS_PARTITIONS,
builder::maxTotalReceiverQueueSizeAcrossPartitions);
configuration.useOption(PULSAR_CONSUMER_NAME, builder::consumerName);
configuration.useOption(
PULSAR_CONSUMER_NAME,
consumerName -> String.format(consumerName, UUID.randomUUID()),
builder::consumerName);
configuration.useOption(PULSAR_READ_COMPACTED, builder::readCompacted);
configuration.useOption(PULSAR_PRIORITY_LEVEL, builder::priorityLevel);
configuration.useOption(
Expand Down

0 comments on commit b829d73

Please sign in to comment.