Skip to content

Commit

Permalink
Merge pull request #3 from Valensas/feature/header-propagation-tests
Browse files Browse the repository at this point in the history
Feature/header propagation tests
  • Loading branch information
yavuzselimkaraman authored Sep 18, 2024
2 parents b82a7b9 + ac4f3a2 commit c85719b
Show file tree
Hide file tree
Showing 9 changed files with 321 additions and 0 deletions.
6 changes: 6 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ dependencies {
compileOnly("org.springframework.boot:spring-boot-starter-web")

compileOnly("com.valensas:kafka:0.3.0")

testImplementation("org.springframework.boot:spring-boot-starter-web")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("com.valensas:kafka:0.3.0")
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("org.springframework.cloud:spring-cloud-starter-openfeign:4.1.1")
}

tasks.withType<KotlinCompile> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.valensas.headerpropagation

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.cloud.openfeign.EnableFeignClients

@SpringBootApplication
@EnableFeignClients
class HeaderPropagationTestApplication
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.valensas.headerpropagation.client

import com.valensas.headerpropagation.util.TestMessage
import org.springframework.cloud.openfeign.FeignClient
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestHeader

@FeignClient(name = "testFeignClient", url = "\${test.feign.client.url}")
interface TestFeignClient {
@PostMapping("/test/feign-to-kafka")
fun postFeignToKafka(
@RequestBody testMessage: TestMessage,
@RequestHeader headers: Map<String, String>
): ResponseEntity<Unit>

@PostMapping("/test/feign-to-kafka-publish")
fun postFeignToKafkaAndPublish(): ResponseEntity<Unit>

@PostMapping("/test/kafka-to-feign")
fun postKafkaToFeign(): ResponseEntity<Unit>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.valensas.headerpropagation.controller

import com.valensas.headerpropagation.client.TestFeignClient
import com.valensas.headerpropagation.service.KafkaProducerService
import com.valensas.headerpropagation.util.Fake
import com.valensas.headerpropagation.util.TestMessage
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestHeader
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/test")
class TestController(
private val kafkaProducerService: KafkaProducerService,
private val testFeignClient: TestFeignClient
) {
@PostMapping("/rest-to-kafka")
fun postRestToKafka(): ResponseEntity<Unit> {
kafkaProducerService.publishMessage("rest-to-kafka-output", Fake.testMessage())
return ResponseEntity.ok().build()
}

@PostMapping("/kafka-to-rest")
fun postKafkaToRest(): ResponseEntity<Unit> {
kafkaProducerService.publishMessage("kafka-to-rest-output", Fake.testMessage())
return ResponseEntity.ok().build()
}

@PostMapping("/feign-to-kafka")
fun postFeignToKafkaAndPost(
@RequestBody testMessage: TestMessage,
@RequestHeader headers: Map<String, String>
): ResponseEntity<Unit> {
testFeignClient.postFeignToKafkaAndPublish()
return ResponseEntity.ok().build()
}

@PostMapping("/feign-to-kafka-publish")
fun postFeignToKafkaAndPublish(): ResponseEntity<Unit> {
kafkaProducerService.publishMessage("feign-to-kafka-output", Fake.testMessage())
return ResponseEntity.ok().build()
}

@PostMapping("/kafka-to-feign")
fun postKafkaToFeign(): ResponseEntity<Unit> {
kafkaProducerService.publishMessage("kafka-to-feign-output", Fake.testMessage())
return ResponseEntity.ok().build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.valensas.headerpropagation.service

import com.valensas.headerpropagation.client.TestFeignClient
import com.valensas.headerpropagation.util.TestMessage
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.test.web.client.TestRestTemplate
import org.springframework.http.HttpEntity
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Service

@Service
@ConditionalOnProperty("spring.kafka.consumer.enabled", havingValue = "true")
class KafkaConsumerService(
private val kafkaProducerService: KafkaProducerService,
private val testRestTemplate: TestRestTemplate,
private val testFeignClient: TestFeignClient
) {
@KafkaListener(topics = ["kafka-to-kafka-input"])
fun consumeKafkaToKafkaInput(message: TestMessage) {
kafkaProducerService.publishMessage("kafka-to-kafka-output", message)
}

@KafkaListener(topics = ["kafka-to-feign-input"])
fun consumeKafkaToFeignInput() {
testFeignClient.postKafkaToFeign()
}

@KafkaListener(topics = ["kafka-to-rest-input"])
fun consumeKafkaToRestInput(message: TestMessage) {
val body = HttpEntity(message)
testRestTemplate.postForEntity("/test/kafka-to-rest", body, String::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.valensas.headerpropagation.service

import org.apache.kafka.clients.producer.ProducerRecord
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service

@Service
class KafkaProducerService(
private val kafkaTemplate: KafkaTemplate<String, Any>
) {
fun publishMessage(
topic: String,
message: Any
) {
val producerRecord = ProducerRecord<String, Any>(topic, message)
kafkaTemplate.send(producerRecord)
}
}
109 changes: 109 additions & 0 deletions src/test/kotlin/com/valensas/headerpropagation/test/IntegrationTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package com.valensas.headerpropagation.test

import com.valensas.headerpropagation.client.TestFeignClient
import com.valensas.headerpropagation.config.ThreadLocalHeaderStore
import com.valensas.headerpropagation.util.Fake
import com.valensas.headerpropagation.util.TestMessage
import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.boot.test.web.client.TestRestTemplate
import org.springframework.http.HttpEntity
import org.springframework.http.HttpHeaders
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.kafka.test.utils.KafkaTestUtils
import org.springframework.test.context.ActiveProfiles

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT, properties = ["server.port=13333"])
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ActiveProfiles("test")
@EmbeddedKafka(
partitions = 1,
topics = [
"kafka-to-kafka-input", "kafka-to-kafka-output", "rest-to-kafka-output", "kafka-to-rest-input",
"kafka-to-rest-output", "feign-to-kafka-output", "kafka-to-feign-input", "kafka-to-feign-output"
]
)
class IntegrationTest(
@Autowired private val restTemplate: TestRestTemplate,
@Autowired private val testFeignClient: TestFeignClient
) {
@Autowired
private lateinit var kafkaTemplate: KafkaTemplate<String, TestMessage>

@Autowired
private lateinit var embeddedKafkaBroker: EmbeddedKafkaBroker

@Test
fun `should propagate headers from Kafka to Kafka`() {
val consumer = Fake.consumer(embeddedKafkaBroker)
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "kafka-to-kafka-output")

ThreadLocalHeaderStore.headers = mapOf(Fake.taskKey to Fake.taskValue)

kafkaTemplate.send(ProducerRecord("kafka-to-kafka-input", Fake.testMessage()))
val record = KafkaTestUtils.getSingleRecord(consumer, "kafka-to-kafka-output")

val headerValue = record.headers().headers(Fake.taskKey).iterator().next().value()
assertEquals(Fake.taskValue, String(headerValue))
}

@Test
fun `should propagate headers from Rest to Kafka`() {
val consumer = Fake.consumer(embeddedKafkaBroker)
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "rest-to-kafka-output")

val requestEntity = HttpEntity(null, HttpHeaders().also { it.set(Fake.taskKey, Fake.taskValue) })

restTemplate.postForEntity("/test/rest-to-kafka", requestEntity, String::class.java)
val record = KafkaTestUtils.getSingleRecord(consumer, "rest-to-kafka-output")

val headerValue = record.headers().headers(Fake.taskKey).iterator().next().value()
assertEquals(Fake.taskValue, String(headerValue))
}

@Test
fun `should propagate headers from Kafka to Rest`() {
val consumer = Fake.consumer(embeddedKafkaBroker)
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "kafka-to-rest-output")

ThreadLocalHeaderStore.headers = mapOf(Fake.taskKey to Fake.taskValue)

kafkaTemplate.send(ProducerRecord("kafka-to-rest-input", Fake.testMessage()))
val record = KafkaTestUtils.getSingleRecord(consumer, "kafka-to-rest-output")

val headerValue = record.headers().headers(Fake.taskKey).iterator().next().value()
assertEquals(Fake.taskValue, String(headerValue))
}

@Test
fun `should propagate headers from Feign to Kafka`() {
val consumer = Fake.consumer(embeddedKafkaBroker)
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "feign-to-kafka-output")

testFeignClient.postFeignToKafka(Fake.testMessage(), mapOf(Fake.taskKey to Fake.taskValue))
val record = KafkaTestUtils.getSingleRecord(consumer, "feign-to-kafka-output")

val headerValue = record.headers().headers(Fake.taskKey).iterator().next().value()
assertEquals(Fake.taskValue, String(headerValue))
}

@Test
fun `should propagate headers from Kafka to Feign`() {
val consumer = Fake.consumer(embeddedKafkaBroker)
embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, "kafka-to-feign-output")

ThreadLocalHeaderStore.headers = mapOf(Fake.taskKey to Fake.taskValue)

kafkaTemplate.send(ProducerRecord("kafka-to-feign-input", Fake.testMessage()))
val record = KafkaTestUtils.getSingleRecord(consumer, "kafka-to-feign-output")

val headerValue = record.headers().headers(Fake.taskKey).iterator().next().value()
assertEquals(Fake.taskValue, String(headerValue))
}
}
35 changes: 35 additions & 0 deletions src/test/kotlin/com/valensas/headerpropagation/util/Fake.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package com.valensas.headerpropagation.util

import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.support.serializer.JsonDeserializer
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.utils.KafkaTestUtils
import java.time.Instant
import kotlin.random.Random

object Fake {
fun consumer(embeddedKafkaBroker: EmbeddedKafkaBroker): Consumer<String, TestMessage> {
val consumerProps =
KafkaTestUtils.consumerProps("testGroup-${Instant.now()}", "true", embeddedKafkaBroker).apply {
this[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
this[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
this[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
this[JsonDeserializer.TRUSTED_PACKAGES] = "*"
}

val consumerFactory = DefaultKafkaConsumerFactory<String, TestMessage>(consumerProps)
return consumerFactory.createConsumer()
}

fun testMessage(): TestMessage = TestMessage(Random.nextLong())

val taskKey = "task-name"
val taskValue = "test"
}

data class TestMessage(
val id: Long
)
37 changes: 37 additions & 0 deletions src/test/resources/application-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
spring:
application:
name: test-application
kafka:
producer:
bootstrap-servers: ${kafka.endpoint}
consumer:
auto-offset-reset: earliest
bootstrap-servers: ${kafka.endpoint}
group-id: test-group
enabled: true

springwolf:
docket:
base-package: com.valensas
info:
title: ${spring.application.name}
version: 0.1.2
servers:
kafka:
host: localhost:9092
protocol: kafka

kafka:
endpoint: ${spring.embedded.kafka.brokers}

header-propagation:
feign.enabled: true
rest.enabled: true
kafka.enabled: true
headers:
- task-name

test:
feign:
client:
url: "http://localhost:13333"

0 comments on commit c85719b

Please sign in to comment.