diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java index 83d5b5d6cd..f69d1bc1cf 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/repository/kafka/KafkaRepositoryAT.java @@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.zalando.nakadi.repository.kafka.KafkaTestHelper.createKafkaProperties; public class KafkaRepositoryAT extends BaseAT { @@ -47,7 +48,6 @@ public class KafkaRepositoryAT extends BaseAT { private static final int ZK_SESSION_TIMEOUT = 30000; private static final int ZK_CONNECTION_TIMEOUT = 10000; private static final int ZK_MAX_IN_FLIGHT_REQUESTS = 1000; - private static final int ACTIVE_PRODUCERS_COUNT = 4; private static final int NAKADI_SEND_TIMEOUT = 10000; private static final int NAKADI_POLL_TIMEOUT = 10000; private static final Long DEFAULT_RETENTION_TIME = 100L; @@ -57,6 +57,7 @@ public class KafkaRepositoryAT extends BaseAT { private static final int KAFKA_REQUEST_TIMEOUT = 30000; private static final int KAFKA_DELIVERY_TIMEOUT = 30000; private static final int KAFKA_MAX_BLOCK_TIMEOUT = 5000; + private static final int KAFKA_METADATA_MAX_AGE_MS = 1000; private static final String KAFKA_COMPRESSION_TYPE = "lz4"; private static final int KAFKA_BATCH_SIZE = 1048576; private static final long KAFKA_BUFFER_MEMORY = KAFKA_BATCH_SIZE * 10L; @@ -92,7 +93,6 @@ public void setup() { DEFAULT_TOPIC_RETENTION, DEFAULT_TOPIC_ROTATION, DEFAULT_COMMIT_TIMEOUT, - ACTIVE_PRODUCERS_COUNT, NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, TIMELINE_WAIT_TIMEOUT, @@ -109,7 +109,8 @@ public void setup() { kafkaSettings = new KafkaSettings(KAFKA_RETRIES, KAFKA_REQUEST_TIMEOUT, KAFKA_BATCH_SIZE, KAFKA_BUFFER_MEMORY, KAFKA_LINGER_MS, KAFKA_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, KAFKA_ENABLE_AUTO_COMMIT, - KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT, "", KAFKA_COMPRESSION_TYPE); + KAFKA_MAX_REQUEST_SIZE, KAFKA_DELIVERY_TIMEOUT, KAFKA_MAX_BLOCK_TIMEOUT, "", KAFKA_COMPRESSION_TYPE, + KAFKA_METADATA_MAX_AGE_MS); kafkaHelper = new KafkaTestHelper(KAFKA_URL); defaultTopicConfig = new NakadiTopicConfig(DEFAULT_PARTITION_COUNT, DEFAULT_CLEANUP_POLICY, Optional.of(DEFAULT_RETENTION_TIME)); @@ -282,7 +283,11 @@ private KafkaTopicRepository createKafkaTopicRepository() { Mockito .doReturn(kafkaHelper.createProducer()) .when(factory) - .takeProducer(); + .takeDefaultProducer(); + Mockito + .doReturn(kafkaHelper.createProducer()) + .when(factory) + .takeProducer(anyString()); return new KafkaTopicRepository.Builder() .setKafkaZookeeper(kafkaZookeeper) diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index 9a9769fe71..70a2636f50 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -100,20 +100,27 @@ public void whenEventTypeRepartitionedTheNewSubscriptionShouldHaveUpdatedPartiti final EventType eventType = NakadiTestUtils.createBusinessEventTypeWithPartitions(1); NakadiTestUtils.publishBusinessEventWithUserDefinedPartition( eventType.getName(), 1, x -> "{\"foo\":\"bar\"}", p -> "0"); + NakadiTestUtils.repartitionEventType(eventType, 2); + Thread.sleep(1500); + final Subscription subscription = createSubscription( RandomSubscriptionBuilder.builder() .withEventType(eventType.getName()) .withStartFrom(BEGIN) .buildSubscriptionBase()); + final TestStreamingClient clientAfterRepartitioning = TestStreamingClient .create(URL, subscription.getId(), "") .start(); + NakadiTestUtils.publishBusinessEventWithUserDefinedPartition( eventType.getName(), 1, x -> "{\"foo\":\"bar" + x + "\"}", p -> "1"); + waitFor(() -> assertThat(clientAfterRepartitioning.getJsonBatches(), Matchers.hasSize(2))); + Assert.assertTrue(clientAfterRepartitioning.getJsonBatches().stream() - .anyMatch(event -> event.getCursor().getPartition().equals("1"))); + .anyMatch(batch -> batch.getCursor().getPartition().equals("1"))); } @Test(timeout = 10000) diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRepartitionAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRepartitionAT.java index febe4c85b3..d511312967 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRepartitionAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaRepartitionAT.java @@ -118,6 +118,8 @@ public void whenEventTypeRepartitionedSubscriptionStartsStreamNewPartitions() th NakadiTestUtils.repartitionEventType(eventType, 2); TestUtils.waitFor(() -> MatcherAssert.assertThat(client.isRunning(), Matchers.is(false))); + Thread.sleep(1500); + final TestStreamingClient clientAfterRepartitioning = TestStreamingClient .create(URL, subscription.getId(), "") .startWithAutocommit(streamBatches -> LOG.info("{}", streamBatches)); @@ -153,9 +155,10 @@ public void shouldRepartitionTimelinedEventType() throws Exception { NakadiTestUtils.createTimeline(eventType.getName()); NakadiTestUtils.repartitionEventType(eventType, 2); - TestUtils.waitFor(() -> MatcherAssert.assertThat(client.isRunning(), Matchers.is(false))); + Thread.sleep(1500); + final TestStreamingClient clientAfterRepartitioning = TestStreamingClient .create(URL, subscription.getId(), "") .startWithAutocommit(streamBatches -> LOG.info("{}", streamBatches)); diff --git a/api-metastore/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java b/api-metastore/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java index 5a6f86adf4..1c10f57c9d 100644 --- a/api-metastore/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java +++ b/api-metastore/src/test/java/org/zalando/nakadi/controller/EventTypeControllerTestCase.java @@ -102,7 +102,7 @@ public EventTypeControllerTestCase() { @Before public void init() throws Exception { - final NakadiSettings nakadiSettings = new NakadiSettings(32, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, 1, + final NakadiSettings nakadiSettings = new NakadiSettings(32, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, 0, NAKADI_EVENT_MAX_BYTES, NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "org/zalando/nakadi", "I am warning you", "I am warning you, even more", "nakadi_archiver", "nakadi_to_s3", 100, 10000); diff --git a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java index 9a5ff2407c..3d608b1cd6 100644 --- a/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java +++ b/api-publishing/src/main/java/org/zalando/nakadi/EventPublishingController.java @@ -35,6 +35,7 @@ import org.zalando.nakadi.service.publishing.BinaryEventPublisher; import org.zalando.nakadi.service.publishing.EventPublisher; import org.zalando.nakadi.service.publishing.NakadiKpiPublisher; +import org.zalando.nakadi.util.SLOBuckets; import javax.servlet.http.HttpServletRequest; import java.io.IOException; @@ -173,7 +174,7 @@ private ResponseEntity postBinaryEvents(final String eventTypeName, final int eventCount = result.getResponses().size(); final long totalSizeBytes = countingInputStream.getCount(); - TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes)); + TracingService.setTag("slo_bucket", SLOBuckets.getNameForBatchSize(totalSizeBytes)); reportMetrics(eventTypeMetrics, result, totalSizeBytes, eventCount); reportSLOs(startingNanos, totalSizeBytes, eventCount, result, eventTypeName, client); @@ -245,7 +246,7 @@ private ResponseEntity postEventInternal(final String eventTypeName, final EventPublishResult result; final int totalSizeBytes = eventsAsString.getBytes(Charsets.UTF_8).length; - TracingService.setTag("slo_bucket", TracingService.getSLOBucketName(totalSizeBytes)); + TracingService.setTag("slo_bucket", SLOBuckets.getNameForBatchSize(totalSizeBytes)); if (delete) { result = publisher.delete(eventsAsString, eventTypeName); diff --git a/app/src/main/resources/application.yml b/app/src/main/resources/application.yml index af77696b31..1b7d6e6b40 100644 --- a/app/src/main/resources/application.yml +++ b/app/src/main/resources/application.yml @@ -55,7 +55,6 @@ nakadi: maxConnections: 5 maxStreamMemoryBytes: 50000000 # ~50 MB kafka: - producers.count: 1 retries: 0 request.timeout.ms: 30000 instanceType: t2.large @@ -72,6 +71,7 @@ nakadi: enable.auto.commit: false delivery.timeout.ms: 30000 # request.timeout.ms + linger.ms max.block.ms: 5000 # kafka default 60000 + metadata.max.age.ms: 300000 # 5 min, default zookeeper: connectionString: zookeeper://zookeeper:2181 sessionTimeoutMs: 10000 @@ -166,6 +166,8 @@ nakadi: AUDIT_LOG_COLLECTION: true EVENT_OWNER_SELECTOR_AUTHZ: false ACCESS_LOG_ENABLED: true + kafka: + metadata.max.age.ms: 1000 kpi: config: stream-data-collection-frequency-ms: 100 diff --git a/core-common/src/main/java/org/zalando/nakadi/config/NakadiSettings.java b/core-common/src/main/java/org/zalando/nakadi/config/NakadiSettings.java index b94b6e3873..5d6d1b80b0 100644 --- a/core-common/src/main/java/org/zalando/nakadi/config/NakadiSettings.java +++ b/core-common/src/main/java/org/zalando/nakadi/config/NakadiSettings.java @@ -15,7 +15,6 @@ public class NakadiSettings { private final long defaultTopicRetentionMs; private final long defaultTopicRotationMs; private final long maxCommitTimeout; - private final int kafkaActiveProducersCount; private final long kafkaPollTimeoutMs; private final long kafkaSendTimeoutMs; private final long timelineWaitTimeoutMs; @@ -36,7 +35,6 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo @Value("${nakadi.topic.default.retentionMs}") final long defaultTopicRetentionMs, @Value("${nakadi.topic.default.rotationMs}") final long defaultTopicRotationMs, @Value("${nakadi.stream.max.commitTimeout}") final long maxCommitTimeout, - @Value("${nakadi.kafka.producers.count}") final int kafkaActiveProducersCount, @Value("${nakadi.kafka.poll.timeoutMs}") final long kafkaPollTimeoutMs, @Value("${nakadi.kafka.send.timeoutMs}") final long kafkaSendTimeoutMs, @Value("${nakadi.timeline.wait.timeoutMs}") final long timelineWaitTimeoutMs, @@ -60,7 +58,6 @@ public NakadiSettings(@Value("${nakadi.topic.max.partitionNum}") final int maxTo this.defaultTopicRetentionMs = defaultTopicRetentionMs; this.defaultTopicRotationMs = defaultTopicRotationMs; this.maxCommitTimeout = maxCommitTimeout; - this.kafkaActiveProducersCount = kafkaActiveProducersCount; this.kafkaPollTimeoutMs = kafkaPollTimeoutMs; this.kafkaSendTimeoutMs = kafkaSendTimeoutMs; this.eventMaxBytes = eventMaxBytes; @@ -99,10 +96,6 @@ public long getMaxCommitTimeout() { return maxCommitTimeout; } - public int getKafkaActiveProducersCount() { - return kafkaActiveProducersCount; - } - public long getKafkaPollTimeoutMs() { return kafkaPollTimeoutMs; } diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java b/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java index b342a97447..1ff1618635 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/KafkaRepositoryCreator.java @@ -64,8 +64,7 @@ public TopicRepository createTopicRepository(final Storage storage) throws Topic zookeeperSettings.getZkConnectionTimeoutMs(), nakadiSettings); final KafkaLocationManager kafkaLocationManager = new KafkaLocationManager(zooKeeperHolder, kafkaSettings); - final KafkaFactory kafkaFactory = new KafkaFactory(kafkaLocationManager, - nakadiSettings.getKafkaActiveProducersCount()); + final KafkaFactory kafkaFactory = new KafkaFactory(kafkaLocationManager); final KafkaZookeeper zk = new KafkaZookeeper(zooKeeperHolder, objectMapper); final KafkaTopicRepository kafkaTopicRepository = new KafkaTopicRepository.Builder() diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaFactory.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaFactory.java index cfb840f038..fa88ee30a4 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaFactory.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaFactory.java @@ -13,56 +13,51 @@ import javax.annotation.Nullable; import java.io.Closeable; -import java.util.ArrayList; -import java.util.List; +import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class KafkaFactory { + public static final String DEFAULT_PRODUCER_CLIENT_ID = "default"; + private static final Logger LOG = LoggerFactory.getLogger(KafkaFactory.class); private final KafkaLocationManager kafkaLocationManager; - private final Map, AtomicInteger> useCount; + private final ReadWriteLock rwLock; - private final List> activeProducers; - private final AtomicLong activeProducerCounter; + private final Map, AtomicInteger> useCountByProducer; + private final Map> activeProducerByClientId; - public KafkaFactory(final KafkaLocationManager kafkaLocationManager, final int numActiveProducers) { + public KafkaFactory(final KafkaLocationManager kafkaLocationManager) { this.kafkaLocationManager = kafkaLocationManager; - this.useCount = new ConcurrentHashMap<>(); this.rwLock = new ReentrantReadWriteLock(); - - this.activeProducers = new ArrayList<>(numActiveProducers); - for (int i = 0; i < numActiveProducers; ++i) { - this.activeProducers.add(null); - } - - this.activeProducerCounter = new AtomicLong(0); + this.useCountByProducer = new ConcurrentHashMap<>(); + this.activeProducerByClientId = new HashMap<>(); } @Nullable - private Producer takeUnderLock(final int index, final boolean canCreate) { + private Producer takeUnderLock(final String clientId, final boolean canCreate) { final Lock lock = canCreate ? rwLock.writeLock() : rwLock.readLock(); lock.lock(); try { - Producer producer = activeProducers.get(index); + Producer producer = activeProducerByClientId.get(clientId); if (null != producer) { - useCount.get(producer).incrementAndGet(); + useCountByProducer.get(producer).incrementAndGet(); return producer; } else if (canCreate) { - producer = createProducerInstance(); - useCount.put(producer, new AtomicInteger(1)); - activeProducers.set(index, producer); + producer = createProducerInstance(clientId); + useCountByProducer.put(producer, new AtomicInteger(1)); + activeProducerByClientId.put(clientId, producer); - LOG.info("New producer instance created: " + producer); + LOG.info("New producer instance created with client id '{}': {}", clientId, producer); return producer; } else { return null; @@ -78,16 +73,18 @@ private Producer takeUnderLock(final int index, final boolean ca * * @return Initialized kafka producer instance. */ - public Producer takeProducer() { - final int index = (int)(activeProducerCounter.incrementAndGet() % activeProducers.size()); - - Producer result = takeUnderLock(index, false); + public Producer takeProducer(final String clientId) { + Producer result = takeUnderLock(clientId, false); if (null == result) { - result = takeUnderLock(index, true); + result = takeUnderLock(clientId, true); } return result; } + public Producer takeDefaultProducer() { + return takeProducer(DEFAULT_PRODUCER_CLIENT_ID); + } + /** * Release kafka producer that was obtained by {@link #takeProducer()} method. If producer was not obtained by * {@link #takeProducer()} call - method will throw {@link NullPointerException} @@ -95,21 +92,21 @@ public Producer takeProducer() { * @param producer Producer to release. */ public void releaseProducer(final Producer producer) { - final AtomicInteger counter = useCount.get(producer); + final AtomicInteger counter = useCountByProducer.get(producer); if (counter != null && 0 == counter.decrementAndGet()) { final boolean deleteProducer; rwLock.readLock().lock(); try { - deleteProducer = !activeProducers.contains(producer); + deleteProducer = !activeProducerByClientId.containsValue(producer); } finally { rwLock.readLock().unlock(); } if (deleteProducer) { rwLock.writeLock().lock(); try { - if (counter.get() == 0 && null != useCount.remove(producer)) { + if (counter.get() == 0 && null != useCountByProducer.remove(producer)) { LOG.info("Stopping producer instance - It was reported that instance should be refreshed " + - "and it is not used anymore: " + producer); + "and it is not used anymore: {}", producer); producer.close(); } } finally { @@ -129,14 +126,15 @@ public void releaseProducer(final Producer producer) { * @param producer Producer instance to terminate. */ public void terminateProducer(final Producer producer) { - LOG.info("Received signal to terminate producer " + producer); + LOG.info("Received signal to terminate producer: {}", producer); rwLock.writeLock().lock(); try { - final int index = activeProducers.indexOf(producer); - if (index >= 0) { - activeProducers.set(index, null); - } else { - LOG.info("Signal for producer termination already received: " + producer); + final Optional clientId = activeProducerByClientId.entrySet().stream() + .filter(kv -> kv.getValue() == producer) + .map(Map.Entry::getKey) + .findFirst(); + if (clientId.isPresent()) { + activeProducerByClientId.remove(clientId.get()); } } finally { rwLock.writeLock().unlock(); @@ -210,8 +208,8 @@ public void close() { } } - protected Producer createProducerInstance() { - return new KafkaProducerCrutch(kafkaLocationManager.getKafkaProducerProperties(), + protected Producer createProducerInstance(final String clientId) { + return new KafkaProducerCrutch(kafkaLocationManager.getKafkaProducerProperties(clientId), new KafkaCrutch(kafkaLocationManager)); } diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java index f0d67319df..298631e0dc 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaLocationManager.java @@ -116,8 +116,11 @@ public Properties getKafkaConsumerProperties() { return properties; } - public Properties getKafkaProducerProperties() { + public Properties getKafkaProducerProperties(final String clientId) { final Properties producerProps = (Properties) kafkaProperties.clone(); + + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, @@ -135,6 +138,7 @@ public Properties getKafkaProducerProperties() { producerProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, kafkaSettings.getMaxRequestSize()); producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, kafkaSettings.getDeliveryTimeoutMs()); producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, kafkaSettings.getMaxBlockMs()); + producerProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, kafkaSettings.getMetadataMaxAgeMs()); return producerProps; } diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java index 728d3899bc..59baeb0f31 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaSettings.java @@ -25,6 +25,7 @@ public class KafkaSettings { private final int maxBlockMs; private final String clientRack; private final String compressionType; + private final int metadataMaxAgeMs; @Autowired public KafkaSettings(@Value("${nakadi.kafka.retries}") final int retries, @@ -38,7 +39,8 @@ public KafkaSettings(@Value("${nakadi.kafka.retries}") final int retries, @Value("${nakadi.kafka.delivery.timeout.ms}") final int deliveryTimeoutMs, @Value("${nakadi.kafka.max.block.ms}") final int maxBlockMs, @Value("${nakadi.kafka.client.rack:}") final String clientRack, - @Value("${nakadi.kafka.compression.type:lz4}") final String compressionType) { + @Value("${nakadi.kafka.compression.type:lz4}") final String compressionType, + @Value("${nakadi.kafka.metadata.max.age.ms}") final int metadataMaxAgeMs) { this.retries = retries; this.requestTimeoutMs = requestTimeoutMs; this.batchSize = batchSize; @@ -51,6 +53,7 @@ public KafkaSettings(@Value("${nakadi.kafka.retries}") final int retries, this.maxBlockMs = maxBlockMs; this.clientRack = clientRack; this.compressionType = compressionType; + this.metadataMaxAgeMs = metadataMaxAgeMs; } public int getRetries() { @@ -100,4 +103,8 @@ public String getClientRack() { public String getCompressionType() { return compressionType; } + + public int getMetadataMaxAgeMs() { + return metadataMaxAgeMs; + } } diff --git a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java index bc88aa2ed1..4d165ece8d 100644 --- a/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java +++ b/core-common/src/main/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepository.java @@ -52,6 +52,7 @@ import org.zalando.nakadi.repository.NakadiTopicConfig; import org.zalando.nakadi.repository.TopicRepository; import org.zalando.nakadi.service.TracingService; +import org.zalando.nakadi.util.SLOBuckets; import javax.annotation.Nullable; import java.io.Closeable; @@ -218,21 +219,23 @@ public void repartition(final String topic, final int partitionsNumber) throws C TopicConfigException { try (AdminClient adminClient = AdminClient.create(kafkaLocationManager.getProperties())) { adminClient.createPartitions(ImmutableMap.of(topic, NewPartitions.increaseTo(partitionsNumber))); + final long timeoutMillis = TimeUnit.SECONDS.toMillis(5); final Boolean areNewPartitionsAdded = Retryer.executeWithRetry(() -> { try (Consumer consumer = kafkaFactory.getConsumer()) { - return consumer.partitionsFor(topic).size() == partitionsNumber; + final List partitions = consumer.partitionsFor(topic); + LOG.info("Repartitioning topic {} partitions: {}, expected: {}", + topic, partitions.size(), partitionsNumber); + return partitions.size() == partitionsNumber; } }, new RetryForSpecifiedTimeStrategy(timeoutMillis) - .withWaitBetweenEachTry(100L) + .withWaitBetweenEachTry(1000L) .withResultsThatForceRetry(Boolean.FALSE)); + if (!Boolean.TRUE.equals(areNewPartitionsAdded)) { throw new TopicConfigException(String.format("Failed to repartition topic to %s", partitionsNumber)); } - final Producer producer = kafkaFactory.takeProducer(); - kafkaFactory.terminateProducer(producer); - kafkaFactory.releaseProducer(producer); } catch (Exception e) { throw new CannotAddPartitionToTopicException(String .format("Failed to increase the number of partition for %s topic to %s", topic, @@ -304,7 +307,14 @@ public boolean topicExists(final String topic) throws TopicRepositoryException { public void syncPostBatch( final String topicId, final List batch, final String eventType, final boolean delete) throws EventPublishingException { - final Producer producer = kafkaFactory.takeProducer(); + + long totalRawSize = 0; + for (final BatchItem item : batch) { + totalRawSize += item.getEventSize(); + } + final String sloBucketName = SLOBuckets.getNameForBatchSize(totalRawSize); + + final Producer producer = kafkaFactory.takeProducer(sloBucketName); try { final Map partitionToBroker = producer.partitionsFor(topicId).stream() .filter(partitionInfo -> partitionInfo.leader() != null) @@ -443,7 +453,8 @@ private void logFailedEvents(final String topicId, final String eventType, final */ public List sendEvents(final String topic, final List nakadiRecords) { - final Producer producer = kafkaFactory.takeProducer(); + + final Producer producer = kafkaFactory.takeDefaultProducer(); final CountDownLatch latch = new CountDownLatch(nakadiRecords.size()); final Map responses = new ConcurrentHashMap<>(); try { @@ -704,7 +715,7 @@ public Map getSizeStats() { } public List listPartitionNamesInternal(final String topicId) { - final Producer producer = kafkaFactory.takeProducer(); + final Producer producer = kafkaFactory.takeDefaultProducer(); try { return unmodifiableList(producer.partitionsFor(topicId) .stream() diff --git a/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java b/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java index 4d2572553a..a4719d787d 100644 --- a/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java +++ b/core-common/src/main/java/org/zalando/nakadi/service/TracingService.java @@ -17,23 +17,8 @@ import static io.opentracing.propagation.Format.Builtin.TEXT_MAP; public class TracingService { - private static final String BUCKET_NAME_5_KB = "<5K"; - private static final String BUCKET_NAME_5_50_KB = "5K-50K"; - private static final String BUCKET_NAME_MORE_THAN_50_KB = ">50K"; - - private static final long BUCKET_5_KB = 5000L; - private static final long BUCKET_50_KB = 50000L; public static final String ERROR_DESCRIPTION = "error.description"; - public static String getSLOBucketName(final long batchSize) { - if (batchSize > BUCKET_50_KB) { - return BUCKET_NAME_MORE_THAN_50_KB; - } else if (batchSize < BUCKET_5_KB) { - return BUCKET_NAME_5_KB; - } - return BUCKET_NAME_5_50_KB; - } - public static Tracer.SpanBuilder buildNewSpan(final String operationName) { return GlobalTracer.get().buildSpan(operationName); } diff --git a/core-common/src/main/java/org/zalando/nakadi/util/SLOBuckets.java b/core-common/src/main/java/org/zalando/nakadi/util/SLOBuckets.java new file mode 100644 index 0000000000..7a88fe95c9 --- /dev/null +++ b/core-common/src/main/java/org/zalando/nakadi/util/SLOBuckets.java @@ -0,0 +1,19 @@ +package org.zalando.nakadi.util; + +public class SLOBuckets { + private static final String BUCKET_NAME_5_KB = "<5K"; + private static final String BUCKET_NAME_5_50_KB = "5K-50K"; + private static final String BUCKET_NAME_MORE_THAN_50_KB = ">50K"; + + private static final long BUCKET_5_KB = 5000L; + private static final long BUCKET_50_KB = 50000L; + + public static String getNameForBatchSize(final long batchSize) { + if (batchSize > BUCKET_50_KB) { + return BUCKET_NAME_MORE_THAN_50_KB; + } else if (batchSize < BUCKET_5_KB) { + return BUCKET_NAME_5_KB; + } + return BUCKET_NAME_5_50_KB; + } +} diff --git a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaFactoryTest.java b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaFactoryTest.java index 5a824a6c57..84e7203bea 100644 --- a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaFactoryTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaFactoryTest.java @@ -12,27 +12,27 @@ public class KafkaFactoryTest { private static class FakeKafkaFactory extends KafkaFactory { - FakeKafkaFactory(final int numActiveProducers) { - super(null, numActiveProducers); + FakeKafkaFactory() { + super(null); } @Override - protected Producer createProducerInstance() { + protected Producer createProducerInstance(final String clientId) { return Mockito.mock(Producer.class); } } @Test public void whenSingleProducerThenTheSameProducerIsGiven() { - final KafkaFactory factory = new FakeKafkaFactory(1); - final Producer producer1 = factory.takeProducer(); + final KafkaFactory factory = new FakeKafkaFactory(); + final Producer producer1 = factory.takeDefaultProducer(); try { Assert.assertNotNull(producer1); } finally { factory.releaseProducer(producer1); } - final Producer producer2 = factory.takeProducer(); + final Producer producer2 = factory.takeDefaultProducer(); try { Assert.assertSame(producer1, producer2); } finally { @@ -42,10 +42,10 @@ public void whenSingleProducerThenTheSameProducerIsGiven() { @Test public void verifySingleProducerIsClosedAtCorrectTime() { - final KafkaFactory factory = new FakeKafkaFactory(1); + final KafkaFactory factory = new FakeKafkaFactory(); final List> producers1 = IntStream.range(0, 10) - .mapToObj(ignore -> factory.takeProducer()).collect(Collectors.toList()); + .mapToObj(ignore -> factory.takeDefaultProducer()).collect(Collectors.toList()); final Producer producer = producers1.get(0); Assert.assertNotNull(producer); producers1.forEach(p -> Assert.assertSame(producer, p)); @@ -55,8 +55,8 @@ public void verifySingleProducerIsClosedAtCorrectTime() { final List> producers2 = IntStream.range(0, 10) - .mapToObj(ignore -> factory.takeProducer()).collect(Collectors.toList()); - final Producer additionalProducer = factory.takeProducer(); + .mapToObj(ignore -> factory.takeDefaultProducer()).collect(Collectors.toList()); + final Producer additionalProducer = factory.takeDefaultProducer(); Assert.assertSame(producer, additionalProducer); producers2.forEach(p -> Assert.assertSame(producer, p)); @@ -73,14 +73,14 @@ public void verifySingleProducerIsClosedAtCorrectTime() { @Test public void verifyNewProducerCreatedAfterCloseOfSingle() { - final KafkaFactory factory = new FakeKafkaFactory(1); - final Producer producer1 = factory.takeProducer(); + final KafkaFactory factory = new FakeKafkaFactory(); + final Producer producer1 = factory.takeDefaultProducer(); Assert.assertNotNull(producer1); factory.terminateProducer(producer1); factory.releaseProducer(producer1); Mockito.verify(producer1, Mockito.times(1)).close(); - final Producer producer2 = factory.takeProducer(); + final Producer producer2 = factory.takeDefaultProducer(); Assert.assertNotNull(producer2); Assert.assertNotSame(producer1, producer2); factory.releaseProducer(producer2); @@ -88,13 +88,24 @@ public void verifyNewProducerCreatedAfterCloseOfSingle() { } @Test - public void testGoldenPathWithManyActiveProducers() { - final KafkaFactory factory = new FakeKafkaFactory(4); + public void testTakingProducerForTheSameOrDifferentKey() { + final KafkaFactory factory = new FakeKafkaFactory(); - final List> producers = IntStream.range(0, 10) - .mapToObj(ignore -> factory.takeProducer()).collect(Collectors.toList()); + final Producer producer1 = factory.takeProducer("key1"); + Assert.assertNotNull(producer1); + + final Producer producer2 = factory.takeProducer("key2"); + Assert.assertNotNull(producer2); + + Assert.assertNotSame(producer1, producer2); - producers.forEach(Assert::assertNotNull); - producers.forEach(factory::releaseProducer); + final Producer producer3 = factory.takeProducer("key1"); + Assert.assertNotNull(producer3); + + Assert.assertSame(producer3, producer1); + + factory.releaseProducer(producer1); + factory.releaseProducer(producer2); + factory.releaseProducer(producer3); } } diff --git a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java index a9859b3905..f3e71555ea 100644 --- a/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java +++ b/core-common/src/test/java/org/zalando/nakadi/repository/kafka/KafkaTopicRepositoryTest.java @@ -119,6 +119,7 @@ private enum ConsumerOffsetMode { private final KafkaTopicRepository kafkaTopicRepository; private final KafkaProducer kafkaProducer; + private final KafkaProducer defaultKafkaProducer; private final KafkaFactory kafkaFactory; @SuppressWarnings("unchecked") @@ -127,6 +128,12 @@ public KafkaTopicRepositoryTest() throws IOException { when(kafkaProducer.partitionsFor(anyString())).then( invocation -> partitionsOfTopic((String) invocation.getArguments()[0]) ); + + defaultKafkaProducer = mock(KafkaProducer.class); + when(defaultKafkaProducer.partitionsFor(anyString())).then( + invocation -> partitionsOfTopic((String) invocation.getArguments()[0]) + ); + nakadiRecordMapper = TestUtils.getNakadiRecordMapper(); kafkaFactory = createKafkaFactory(); kafkaTopicRepository = createKafkaRepository(kafkaFactory); @@ -449,7 +456,7 @@ public void testSendNakadiRecordsOk() { final var nakadiRecord = getTestNakadiRecord("0"); final List nakadiRecords = Lists.newArrayList(nakadiRecord, nakadiRecord, nakadiRecord); - when(kafkaProducer.send(any(), any())).thenAnswer(invocation -> { + when(defaultKafkaProducer.send(any(), any())).thenAnswer(invocation -> { final Callback callback = (Callback) invocation.getArguments()[1]; callback.onCompletion(null, null); return null; @@ -471,7 +478,7 @@ public void testSendNakadiRecordsHalfPublished() throws IOException { getTestNakadiRecord("3")); final Exception exception = new Exception(); - when(kafkaProducer.send(any(), any())).thenAnswer(invocation -> { + when(defaultKafkaProducer.send(any(), any())).thenAnswer(invocation -> { final ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; final Callback callback = (Callback) invocation.getArguments()[1]; if (record.partition() % 2 == 0) { @@ -505,7 +512,7 @@ public void testSendNakadiRecordsHalfSubmitted() throws IOException { getTestNakadiRecord("3")); final KafkaException exception = new KafkaException(); - when(kafkaProducer.send(any(), any())).thenAnswer(invocation -> { + when(defaultKafkaProducer.send(any(), any())).thenAnswer(invocation -> { final ProducerRecord record = (ProducerRecord) invocation.getArguments()[0]; final Callback callback = (Callback) invocation.getArguments()[1]; if (record.partition() <= 1) { @@ -596,7 +603,8 @@ private KafkaFactory createKafkaFactory() { when(kafkaFactory.getConsumer(KAFKA_CLIENT_ID)).thenReturn(consumer); when(kafkaFactory.getConsumer()).thenReturn(consumer); - when(kafkaFactory.takeProducer()).thenReturn(kafkaProducer); + when(kafkaFactory.takeDefaultProducer()).thenReturn(defaultKafkaProducer); + when(kafkaFactory.takeProducer(anyString())).thenReturn(kafkaProducer); return kafkaFactory; } diff --git a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java index 0b8348151a..e0f8c7ce87 100644 --- a/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java +++ b/core-services/src/test/java/org/zalando/nakadi/service/publishing/EventPublisherTest.java @@ -91,7 +91,7 @@ public class EventPublisherTest { protected final Enrichment enrichment = mock(Enrichment.class); protected final AuthorizationValidator authzValidator = mock(AuthorizationValidator.class); protected final TimelineService timelineService = Mockito.mock(TimelineService.class); - protected final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, 1, + protected final NakadiSettings nakadiSettings = new NakadiSettings(0, 0, 0, TOPIC_RETENTION_TIME_MS, 0, 60, NAKADI_POLL_TIMEOUT, NAKADI_SEND_TIMEOUT, TIMELINE_WAIT_TIMEOUT_MS, NAKADI_EVENT_MAX_BYTES, NAKADI_SUBSCRIPTION_MAX_PARTITIONS, "service", "org/zalando/nakadi", "", "", "nakadi_archiver", "nakadi_to_s3", 100, 10000);