Skip to content

Commit

Permalink
transmit key from input topic to output topic (#3578)
Browse files Browse the repository at this point in the history
  • Loading branch information
eastmon94 authored Sep 15, 2021
1 parent 8988f5c commit ab1868b
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 0 deletions.
1 change: 1 addition & 0 deletions executor/api/kafka/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func (ks *SeldonKafkaServer) Serve() error {

job := KafkaJob{
headers: headers,
reqKey: e.Key,
reqPayload: reqPayload,
}
// enqueue a job
Expand Down
2 changes: 2 additions & 0 deletions executor/api/kafka/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

type KafkaJob struct {
headers map[string][]string
reqKey []byte
reqPayload payload.SeldonPayload
}

Expand Down Expand Up @@ -62,6 +63,7 @@ func (ks *SeldonKafkaServer) processKafkaRequest(job *KafkaJob) {

err = ks.Producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &ks.TopicOut, Partition: kafka.PartitionAny},
Key: job.reqKey,
Value: resBytes,
Headers: kafkaHeaders,
}, nil)
Expand Down

0 comments on commit ab1868b

Please sign in to comment.