Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
Dedicate a Kafka producer to each of the publishing SLO buckets (#1490)
Browse files Browse the repository at this point in the history
This is the evolution of #1488, where we already had multiple producers.
  • Loading branch information
a1exsh authored Jan 18, 2023
1 parent b9d3658 commit 3266042
Show file tree
Hide file tree
Showing 17 changed files with 160 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.zalando.nakadi.repository.kafka.KafkaTestHelper.createKafkaProperties;

public class KafkaRepositoryAT extends BaseAT {
Expand All @@ -47,7 +48,6 @@ public class KafkaRepositoryAT extends BaseAT {
private static final int ZK_SESSION_TIMEOUT = 30000;
private static final int ZK_CONNECTION_TIMEOUT = 10000;
private static final int ZK_MAX_IN_FLIGHT_REQUESTS = 1000;
private static final int ACTIVE_PRODUCERS_COUNT = 4;
private static final int NAKADI_SEND_TIMEOUT = 10000;
private static final int NAKADI_POLL_TIMEOUT = 10000;
private static final Long DEFAULT_RETENTION_TIME = 100L;
Expand All @@ -57,6 +57,7 @@ public class KafkaRepositoryAT extends BaseAT {
private static final int KAFKA_REQUEST_TIMEOUT = 30000;
private static final int KAFKA_DELIVERY_TIMEOUT = 30000;
private static final int KAFKA_MAX_BLOCK_TIMEOUT = 5000;
private static final int KAFKA_METADATA_MAX_AGE_MS = 1000;
private static final String KAFKA_COMPRESSION_TYPE = "lz4";
private static final int KAFKA_BATCH_SIZE = 1048576;
private static final long KAFKA_BUFFER_MEMORY = KAFKA_BATCH_SIZE * 10L;
Expand Down Expand Up @@ -92,7 +93,6 @@ public void setup() {
DEFAULT_TOPIC_RETENTION,
DEFAULT_TOPIC_ROTATION,
DEFAULT_COMMIT_TIMEOUT,
ACTIVE_PRODUCERS_COUNT,
NAKADI_POLL_TIMEOUT,
NAKADI_SEND_TIMEOUT,
TIMELINE_WAIT_TIMEOUT,
Expand All @@ -109,7 +109,8 @@ public void setup() {

kafkaSettings = new KafkaSettings(KAFKA_RETRIES, KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY,
KAFKA_LINGER_MS, KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, KAFKA_ENABLE_AUTO_COMMIT,
KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT, "", KAFKA_COMPRESSION_TYPE);
KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT, "", KAFKA_COMPRESSION_TYPE,
KAFKA_METADATA_MAX_AGE_MS);
kafkaHelper = new KafkaTestHelper(KAFKA_URL);
defaultTopicConfig = new NakadiTopicConfig(DEFAULT_PARTITION_COUNT, DEFAULT_CLEANUP_POLICY,
Optional.of(DEFAULT_RETENTION_TIME));
Expand Down Expand Up @@ -282,7 +283,11 @@ private KafkaTopicRepository createKafkaTopicRepository() {
Mockito
.doReturn(kafkaHelper.createProducer())
.when(factory)
.takeProducer();
.takeDefaultProducer();
Mockito
.doReturn(kafkaHelper.createProducer())
.when(factory)
.takeProducer(anyString());

return new KafkaTopicRepository.Builder()
.setKafkaZookeeper(kafkaZookeeper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,20 +100,27 @@ public void whenEventTypeRepartitionedTheNewSubscriptionShouldHaveUpdatedPartiti
final EventType eventType = NakadiTestUtils.createBusinessEventTypeWithPartitions(1);
NakadiTestUtils.publishBusinessEventWithUserDefinedPartition(
eventType.getName(), 1, x -> "{\"foo\":\"bar\"}", p -> "0");

NakadiTestUtils.repartitionEventType(eventType, 2);
Thread.sleep(1500);

final Subscription subscription = createSubscription(
RandomSubscriptionBuilder.builder()
.withEventType(eventType.getName())
.withStartFrom(BEGIN)
.buildSubscriptionBase());

final TestStreamingClient clientAfterRepartitioning = TestStreamingClient
.create(URL, subscription.getId(), "")
.start();

NakadiTestUtils.publishBusinessEventWithUserDefinedPartition(
eventType.getName(), 1, x -> "{\"foo\":\"bar" + x + "\"}", p -> "1");

waitFor(() -> assertThat(clientAfterRepartitioning.getJsonBatches(), Matchers.hasSize(2)));

Assert.assertTrue(clientAfterRepartitioning.getJsonBatches().stream()
.anyMatch(event -> event.getCursor().getPartition().equals("1")));
.anyMatch(batch -> batch.getCursor().getPartition().equals("1")));
}

@Test(timeout = 10000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ public void whenEventTypeRepartitionedSubscriptionStartsStreamNewPartitions() th
NakadiTestUtils.repartitionEventType(eventType, 2);
TestUtils.waitFor(() -> MatcherAssert.assertThat(client.isRunning(), Matchers.is(false)));

Thread.sleep(1500);

final TestStreamingClient clientAfterRepartitioning = TestStreamingClient
.create(URL, subscription.getId(), "")
.startWithAutocommit(streamBatches -> LOG.info("{}", streamBatches));
Expand Down Expand Up @@ -153,9 +155,10 @@ public void shouldRepartitionTimelinedEventType() throws Exception {
NakadiTestUtils.createTimeline(eventType.getName());

NakadiTestUtils.repartitionEventType(eventType, 2);

TestUtils.waitFor(() -> MatcherAssert.assertThat(client.isRunning(), Matchers.is(false)));

Thread.sleep(1500);

final TestStreamingClient clientAfterRepartitioning = TestStreamingClient
.create(URL, subscription.getId(), "")
.startWithAutocommit(streamBatches -> LOG.info("{}", streamBatches));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public EventTypeControllerTestCase() {
@Before
public void init() throws Exception {

final NakadiSettings nakadiSettings = new NakadiSettings(32, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, 1,
final NakadiSettings nakadiSettings = new NakadiSettings(32, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60,
NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, 0, NAKADI_EVENT_MAX_BYTES,
NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "org/zalando/nakadi", "I am warning you",
"I am warning you, even more", "nakadi_archiver", "nakadi_to_s3", 100, 10000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.zalando.nakadi.service.publishing.BinaryEventPublisher;
import org.zalando.nakadi.service.publishing.EventPublisher;
import org.zalando.nakadi.service.publishing.NakadiKpiPublisher;
import org.zalando.nakadi.util.SLOBuckets;

import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
Expand Down Expand Up @@ -173,7 +174,7 @@ private ResponseEntity postBinaryEvents(final String eventTypeName,
final int eventCount = result.getResponses().size();

final long totalSizeBytes = countingInputStream.getCount();
TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes));
TracingService.setTag("slo_bucket", SLOBuckets.getNameForBatchSize(totalSizeBytes));

reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount);
reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client);
Expand Down Expand Up @@ -245,7 +246,7 @@ private ResponseEntity postEventInternal(final String eventTypeName,
final EventPublishResult result;

final int totalSizeBytes = eventsAsString.getBytes(Charsets.UTF_8).length;
TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes));
TracingService.setTag("slo_bucket", SLOBuckets.getNameForBatchSize(totalSizeBytes));

if (delete) {
result = publisher.delete(eventsAsString, eventTypeName);
Expand Down
4 changes: 3 additions & 1 deletion app/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ nakadi:
maxConnections: 5
maxStreamMemoryBytes: 50000000 # ~50 MB
kafka:
producers.count: 1
retries: 0
request.timeout.ms: 30000
instanceType: t2.large
Expand All @@ -72,6 +71,7 @@ nakadi:
enable.auto.commit: false
delivery.timeout.ms: 30000 # request.timeout.ms + linger.ms
max.block.ms: 5000 # kafka default 60000
metadata.max.age.ms: 300000 # 5 min, default
zookeeper:
connectionString: zookeeper://zookeeper:2181
sessionTimeoutMs: 10000
Expand Down Expand Up @@ -166,6 +166,8 @@ nakadi:
AUDIT_LOG_COLLECTION: true
EVENT_OWNER_SELECTOR_AUTHZ: false
ACCESS_LOG_ENABLED: true
kafka:
metadata.max.age.ms: 1000
kpi:
config:
stream-data-collection-frequency-ms: 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public class NakadiSettings {
private final long defaultTopicRetentionMs;
private final long defaultTopicRotationMs;
private final long maxCommitTimeout;
private final int kafkaActiveProducersCount;
private final long kafkaPollTimeoutMs;
private final long kafkaSendTimeoutMs;
private final long timelineWaitTimeoutMs;
Expand All @@ -36,7 +35,6 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
@Value("${nakadi.topic.default.retentionMs}") final long defaultTopicRetentionMs,
@Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs,
@Value("${nakadi.stream.max.commitTimeout}") final long maxCommitTimeout,
@Value("${nakadi.kafka.producers.count}") final int kafkaActiveProducersCount,
@Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs,
@Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs,
@Value("${nakadi.timeline.wait.timeoutMs}") final long timelineWaitTimeoutMs,
Expand All @@ -60,7 +58,6 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo
this.defaultTopicRetentionMs = defaultTopicRetentionMs;
this.defaultTopicRotationMs = defaultTopicRotationMs;
this.maxCommitTimeout = maxCommitTimeout;
this.kafkaActiveProducersCount = kafkaActiveProducersCount;
this.kafkaPollTimeoutMs = kafkaPollTimeoutMs;
this.kafkaSendTimeoutMs = kafkaSendTimeoutMs;
this.eventMaxBytes = eventMaxBytes;
Expand Down Expand Up @@ -99,10 +96,6 @@ public long getMaxCommitTimeout() {
return maxCommitTimeout;
}

public int getKafkaActiveProducersCount() {
return kafkaActiveProducersCount;
}

public long getKafkaPollTimeoutMs() {
return kafkaPollTimeoutMs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ public TopicRepository createTopicRepository(final Storage storage) throws Topic
zookeeperSettings.getZkConnectionTimeoutMs(),
nakadiSettings);
final KafkaLocationManager kafkaLocationManager = new KafkaLocationManager(zooKeeperHolder, kafkaSettings);
final KafkaFactory kafkaFactory = new KafkaFactory(kafkaLocationManager,
nakadiSettings.getKafkaActiveProducersCount());
final KafkaFactory kafkaFactory = new KafkaFactory(kafkaLocationManager);
final KafkaZookeeper zk = new KafkaZookeeper(zooKeeperHolder, objectMapper);
final KafkaTopicRepository kafkaTopicRepository =
new KafkaTopicRepository.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,51 @@

import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class KafkaFactory {

public static final String DEFAULT_PRODUCER_CLIENT_ID = "default";

private static final Logger LOG = LoggerFactory.getLogger(KafkaFactory.class);
private final KafkaLocationManager kafkaLocationManager;
private final Map<Producer<byte[], byte[]>, AtomicInteger> useCount;

private final ReadWriteLock rwLock;
private final List<Producer<byte[], byte[]>> activeProducers;
private final AtomicLong activeProducerCounter;
private final Map<Producer<byte[], byte[]>, AtomicInteger> useCountByProducer;
private final Map<String, Producer<byte[], byte[]>> activeProducerByClientId;

public KafkaFactory(final KafkaLocationManager kafkaLocationManager, final int numActiveProducers) {
public KafkaFactory(final KafkaLocationManager kafkaLocationManager) {
this.kafkaLocationManager = kafkaLocationManager;

this.useCount = new ConcurrentHashMap<>();
this.rwLock = new ReentrantReadWriteLock();

this.activeProducers = new ArrayList<>(numActiveProducers);
for (int i = 0; i < numActiveProducers; ++i) {
this.activeProducers.add(null);
}

this.activeProducerCounter = new AtomicLong(0);
this.useCountByProducer = new ConcurrentHashMap<>();
this.activeProducerByClientId = new HashMap<>();
}

@Nullable
private Producer<byte[], byte[]> takeUnderLock(final int index, final boolean canCreate) {
private Producer<byte[], byte[]> takeUnderLock(final String clientId, final boolean canCreate) {
final Lock lock = canCreate ? rwLock.writeLock() : rwLock.readLock();
lock.lock();
try {
Producer<byte[], byte[]> producer = activeProducers.get(index);
Producer<byte[], byte[]> producer = activeProducerByClientId.get(clientId);
if (null != producer) {
useCount.get(producer).incrementAndGet();
useCountByProducer.get(producer).incrementAndGet();
return producer;
} else if (canCreate) {
producer = createProducerInstance();
useCount.put(producer, new AtomicInteger(1));
activeProducers.set(index, producer);
producer = createProducerInstance(clientId);
useCountByProducer.put(producer, new AtomicInteger(1));
activeProducerByClientId.put(clientId, producer);

LOG.info("New producer instance created: " + producer);
LOG.info("New producer instance created with client id '{}': {}", clientId, producer);
return producer;
} else {
return null;
Expand All @@ -78,38 +73,40 @@ private Producer<byte[], byte[]> takeUnderLock(final int index, final boolean ca
*
* @return Initialized kafka producer instance.
*/
public Producer<byte[], byte[]> takeProducer() {
final int index = (int)(activeProducerCounter.incrementAndGet() % activeProducers.size());

Producer<byte[], byte[]> result = takeUnderLock(index, false);
public Producer<byte[], byte[]> takeProducer(final String clientId) {
Producer<byte[], byte[]> result = takeUnderLock(clientId, false);
if (null == result) {
result = takeUnderLock(index, true);
result = takeUnderLock(clientId, true);
}
return result;
}

public Producer<byte[], byte[]> takeDefaultProducer() {
return takeProducer(DEFAULT_PRODUCER_CLIENT_ID);
}

/**
* Release kafka producer that was obtained by {@link #takeProducer()} method. If producer was not obtained by
* {@link #takeProducer()} call - method will throw {@link NullPointerException}
*
* @param producer Producer to release.
*/
public void releaseProducer(final Producer<byte[], byte[]> producer) {
final AtomicInteger counter = useCount.get(producer);
final AtomicInteger counter = useCountByProducer.get(producer);
if (counter != null && 0 == counter.decrementAndGet()) {
final boolean deleteProducer;
rwLock.readLock().lock();
try {
deleteProducer = !activeProducers.contains(producer);
deleteProducer = !activeProducerByClientId.containsValue(producer);
} finally {
rwLock.readLock().unlock();
}
if (deleteProducer) {
rwLock.writeLock().lock();
try {
if (counter.get() == 0 && null != useCount.remove(producer)) {
if (counter.get() == 0 && null != useCountByProducer.remove(producer)) {
LOG.info("Stopping producer instance - It was reported that instance should be refreshed " +
"and it is not used anymore: " + producer);
"and it is not used anymore: {}", producer);
producer.close();
}
} finally {
Expand All @@ -129,14 +126,15 @@ public void releaseProducer(final Producer<byte[], byte[]> producer) {
* @param producer Producer instance to terminate.
*/
public void terminateProducer(final Producer<byte[], byte[]> producer) {
LOG.info("Received signal to terminate producer " + producer);
LOG.info("Received signal to terminate producer: {}", producer);
rwLock.writeLock().lock();
try {
final int index = activeProducers.indexOf(producer);
if (index >= 0) {
activeProducers.set(index, null);
} else {
LOG.info("Signal for producer termination already received: " + producer);
final Optional<String> clientId = activeProducerByClientId.entrySet().stream()
.filter(kv -> kv.getValue() == producer)
.map(Map.Entry::getKey)
.findFirst();
if (clientId.isPresent()) {
activeProducerByClientId.remove(clientId.get());
}
} finally {
rwLock.writeLock().unlock();
Expand Down Expand Up @@ -210,8 +208,8 @@ public void close() {
}
}

protected Producer<byte[], byte[]> createProducerInstance() {
return new KafkaProducerCrutch(kafkaLocationManager.getKafkaProducerProperties(),
protected Producer<byte[], byte[]> createProducerInstance(final String clientId) {
return new KafkaProducerCrutch(kafkaLocationManager.getKafkaProducerProperties(clientId),
new KafkaCrutch(kafkaLocationManager));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,11 @@ public Properties getKafkaConsumerProperties() {
return properties;
}

public Properties getKafkaProducerProperties() {
public Properties getKafkaProducerProperties(final String clientId) {
final Properties producerProps = (Properties) kafkaProperties.clone();

producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);

producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Expand All @@ -135,6 +138,7 @@ public Properties getKafkaProducerProperties() {
producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaSettings.getMaxRequestSize());
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaSettings.getDeliveryTimeoutMs());
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, kafkaSettings.getMaxBlockMs());
producerProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, kafkaSettings.getMetadataMaxAgeMs());
return producerProps;
}

Expand Down
Loading

0 comments on commit 3266042

Please sign in to comment.