This repository contains another KafkaChannel implementation, which we believe is more flexible and scalable.
While working on other KafkaChannel implementations we realized we could reuse the dataplane for other Knative Kafka components, namely Knative KafkaBroker and the Knative KafkaSink.
The dataplane is designed in a way that it only communicates the control plane via a configmap and thus it is:
- More loosely-coupled
- Easier to test standalone
- Using a different stack that we find more suitable for the dataplane
Note
Consolidated and Distributed channel are deprecated.
You may find a comparison of this new KafkaChannel implementation with the deprecated consolidated KafkaChannel and the deprecated distributed KafkaChannel implementations.
New KafkaChannel | Consolidated KafkaChannel (Not supported anymore) | Distributed KafkaChannel (Not supported anymore) | |
---|---|---|---|
Delivery Guarantees | At least once | At least once | At least once |
Scalability | Ingress and egress are in different pods and scalable** | Ingress and egress are in the same pod and not scalable | Ingress and egress are in different pods but not scalable |
Data plane multitenancy | All KafkaChannel resources leverage the same dataplane | All KafkaChannel resources leverage the same dataplane | All KafkaChannel resources leverage the same ingress but each has a dedicated egress |
Offset repositioning | Not supported | Not supported | Supported |
Dataplane stack | Java (Vert.x) | Go (Sarama) | Go (Sarama) |
** Work in progress
-
Setup Knative Eventing
-
Install an Apache Kafka cluster, if you have not done so already.
For Kubernetes a simple installation is done using the Strimzi Kafka Operator. Its installation guides provide content for Kubernetes and Openshift.
Note: This
KafkaChannel
is not limited to Apache Kafka installations on Kubernetes. It is also possible to use an off-cluster Apache Kafka installation. -
Apply the channel manifests that are listed as assets in releases:
# The whole suite kubectl apply -f eventing-kafka.yaml # Or, only the KafkaChannel kubectl apply -f eventing-kafka-controller.yaml kubectl apply -f eventing-kafka-channel.yaml # Then apply the post-install job (if available) kubectl apply -f eventing-kafka-post-install.yaml
-
Configure the
bootstrap.servers
value in thekafka-channel-config
ConfigMap in theknative-eventing
namespace.apiVersion: v1 kind: ConfigMap metadata: name: kafka-channel-config namespace: knative-eventing data: # Replace this with the URLs for your kafka cluster, # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. bootstrap.servers: "REPLACE_WITH_CLUSTER_URL"
-
If you are going to use authentication or encryption, create your secret.
For using authentication, these values must exist in the secret:
sasl.mechanism
: Can be one ofPLAIN
,SCRAM-SHA-256
orSCRAM-SHA-512
. See Apache Kafka SASL configuration documentation for more information.user
: Username to use in the authentication context. See Apache Kafka SASL configuration documentation for more information.password
: Password to use in the authentication context. See Apache Kafka SASL configuration documentation for more information.
For using encryption, these values must exist in the secret:
ca.crt
: Certificate authority certificate. See Apache Kafka SSL configuration documentation for more information.user.crt
: User certificate. See Apache Kafka SSL configuration documentation for more information.user.key
: User certificate key. See Apache Kafka SSL configuration documentation for more information.
In any case,
protocol
value must be set explicitly to one of:PLAINTEXT
: No authentication and encryption are used.SSL
: Only encryption is usedSASL_PLAINTEXT
: Only authentication is usedSASL_SSL
: Both authentication and encryption are used
ca_cert_secret="my-cluster-cluster-ca-cert" tls_user="my-tls-user" sasl_user="my-sasl-user" STRIMZI_CRT=$(kubectl -n kafka get secret "${ca_cert_secret}" --template='{{index .data "ca.crt"}}' | base64 --decode ) SASL_PASSWD=$(kubectl -n kafka get secret "${sasl_user}" --template='{{index .data "password"}}' | base64 --decode ) TLSUSER_CRT=$(kubectl -n kafka get secret "${tls_user}" --template='{{index .data "user.crt"}}' | base64 --decode ) TLSUSER_KEY=$(kubectl -n kafka get secret "${tls_user}" --template='{{index .data "user.key"}}' | base64 --decode ) # SSL without SASL authentication kubectl create secret --namespace knative-eventing generic strimzi-tls-secret \ --from-literal=ca.crt="$STRIMZI_CRT" \ --from-literal=user.crt="$TLSUSER_CRT" \ --from-literal=user.key="$TLSUSER_KEY" \ --from-literal=protocol="SSL" \ --dry-run=client -o yaml | kubectl apply -n knative-eventing -f - # Or, SSL with SASL authentication kubectl create secret --namespace knative-eventing generic strimzi-sasl-secret \ --from-literal=ca.crt="$STRIMZI_CRT" \ --from-literal=password="$SASL_PASSWD" \ --from-literal=user="my-sasl-user" \ --from-literal=protocol="SASL_SSL" \ --from-literal=sasl.mechanism="SCRAM-SHA-512" \ --dry-run=client -o yaml | kubectl apply -n knative-eventing -f - # Or, no SSL with SASL authentication kubectl create secret --namespace knative-eventing generic strimzi-sasl-plain-secret \ --from-literal=password="$SASL_PASSWD" \ --from-literal=user="my-sasl-user" \ --from-literal=protocol="SASL_PLAINTEXT" \ --from-literal=sasl.mechanism="SCRAM-SHA-512" \ --dry-run=client -o yaml | kubectl apply -n knative-eventing -f -
-
Configure
auth.secret.ref.name
andauth.secret.ref.namespace
values in thekafka-channel-config
ConfigMap in theknative-eventing
namespace.apiVersion: v1 kind: ConfigMap metadata: name: kafka-channel-config namespace: knative-eventing data: # Replace this with the URLs for your kafka cluster, # which is in the format of my-cluster-kafka-bootstrap.my-kafka-namespace:9092. bootstrap.servers: "REPLACE_WITH_CLUSTER_URL" # Replace with secret name, such as strimzi-sasl-secret as created above auth.secret.ref.name: REPLACE_WITH_SECRET_NAME auth.secret.ref.namespace: knative-eventing
You can create the KafkaChannel
custom objects just like:
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
metadata:
name: my-kafka-channel
spec:
numPartitions: 1
replicationFactor: 1
retentionDuration: PT168H
You can configure the number of partitions with numPartitions
, as well as
the replication factor with replicationFactor
, and the Kafka message
retention with retentionDuration
. If not set, these will be defaulted by
the WebHook to 1
, 1
, and PT168H
respectively.
Note
Consolidated and Distributed KafkaChannel are not supported anymore, please migrate to the new implementation.
There are a few breaking changes however automated migration is possible for migrations from the consolidated channel implementation to the new implementation.
Distributed channel users need to migrate manually.
New implementation uses a new configmap called kafka-channel-config
. It has a different structure as well.
The consolidated channel config looks like this:
apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka
namespace: knative-eventing
data:
version: 1.0.0
sarama: |
enableLogging: false
config: |
Version: 2.0.0 # Kafka Version Compatibility From Sarama's Supported List (Major.Minor.Patch)
Admin:
Timeout: 10000000000 # 10 seconds
Net:
KeepAlive: 30000000000 # 30 seconds
MaxOpenRequests: 1 # Set to 1 for use with Idempotent Producer
TLS:
Enable: true
SASL:
Enable: true
Version: 1
Metadata:
RefreshFrequency: 300000000000 # 5 minutes
Consumer:
Offsets:
AutoCommit:
Interval: 5000000000 # 5 seconds
Retention: 604800000000000 # 1 week
Producer:
Idempotent: true # Must be false for Azure EventHubs
RequiredAcks: -1 # -1 = WaitForAll, Most stringent option for "at-least-once" delivery.
eventing-kafka: |
cloudevents:
maxIdleConns: 1000
maxIdleConnsPerHost: 100
kafka:
authSecretName: kafka-cluster
authSecretNamespace: knative-eventing
brokers: my-cluster-kafka-bootstrap.kafka:9092
channel:
adminType: kafka # One of "kafka", "azure", "custom"
dispatcher:
cpuRequest: 100m
memoryRequest: 50Mi
receiver:
cpuRequest: 100m
memoryRequest: 50Mi
However, new channel implementation's config looks like this:
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-channel-config
namespace: knative-eventing
data:
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
auth.secret.ref.name: kafka-cluster
auth.secret.ref.namespace: knative-eventing
Most of the options are not available in the new channel implementation because we are not using Sarama in the new dataplane anymore. Some of the other options are not available because they are set to sensible defaults at this moment.
Dataplane can be configured by adjusting the values in a different configmap called config-kafka-channel-data-plane
.
See configuring dataplane section for more details.
The new channel implementation requires the secret to have
- A new key in the secret called
protocol
- Key called
sasl.mechanism
instead ofsaslType
New channel implementation still supports the old secret format by inferring the protocol
and sasl.mechanism
from the values
in the old secret. However, this fallback will be deprecated and it is advised that you manually adjust your secrets to have these
new keys.
In the consolidated channel implementation, protocol
was inferred from the available information in the secret. However,
in the new channel, we will ask users to explicitly define the protocol.
Also the key saslType
is changed to sasl.mechanism
with the same possible values.
Using SASL authentication with SSL encryption looks like this in the previous secret format:
apiVersion: v1
kind: Secret
metadata:
name: strimzi-sasl-secret
namespace: knative-eventing
type: Opaque
data:
# SSL encryption
ca.crt: ...
user.crt: ...
user.key: ...
# SASL authentication
user: ...
password: ...
saslType: ...
New implementation requires specifying the protocol explicitly:
apiVersion: v1
kind: Secret
metadata:
name: strimzi-sasl-secret
namespace: knative-eventing
type: Opaque
data:
# SSL encryption
ca.crt: ...
user.crt: ...
user.key: ...
# SASL authentication
user: ...
password: ...
sasl.mechanism: ... # RENAMED from saslType
# Protocol
protocol: ... # NEW
Possible values for the protocol are:
PLAINTEXT
: No SASL and SSL are usedSSL
: Only SSL is usedSASL_PLAINTEXT
: Only SASL is usedSASL_SSL
: Both SASL and SSL are used
New channel implementation can use the existing secrets as is. However, the old secret format will be deprecated soon, and it is advised to modify your secret.
Consolidated channel controller creates a service per channel resource. This channel service is used as the address of the channel itself, and it will forward the events to the dispatcher. As each channel resource has its own service, they will also have a separate host name and the dispatcher will be able to identify the channel by looking at its hostname.
In the new KafkaChannel implementation, this is preserved. However, in the future versions, we will switch from host-based-routing to path-based-routing. We will use a single hostname for the channel ingress and identify channels by the URL paths.
The new channel implementation's controllers use the same lease name as the previous controllers. This provides a nice and smooth transition to new controllers with a roll-out.
Starting with the 1.4 version of the new channel implementation, there is an automated migration post-install job available for the migration from the consolidated channel.
This job:
- Deletes unnecessary resources (old webhook, old hpa, old serviceAccount, old roles)
- Copies over relevant parts of old configmap (
config-kafka
) to new configmap (kafka-channel-config
) - Adjusts the channel services (the services that are created per channel instance) to use new channel ingress
- Eventually, deletes the old configmap (
config-kafka
) and the old deployments
All configuration options in the previous configmap are ignored, except these:
eventing-kafka.kafka.brokers
: Copied over tobootstrap.servers
in new configmapeventing-kafka.kafka.authSecretName
: Copied over toauth.secret.ref.name
in new configmapeventing-kafka.kafka.authSecretNamespace
: Copied over toauth.secret.ref.namespace
in new configmap
Secret is not modified and the protocol
information is inferred from what's available in the secret and the
previous configmap. However, this inferring will be deprecated and removed soon.
To migrate from the consolidated channel, apply eventing-kafka-post-install.yaml
artifact after you apply
KafkaChannel artifacts (eventing-kafka-controller.yaml
, eventing-kafka-channel.yaml
, eventing-kafka-channel-prometheus-operator.yaml
)
or the whole Knative Kafka suite (eventing-kafka.yaml
).
You may find an example run of the automated migration below.
Please note that the commands are run on a cluster that has Knative eventing already installed.
❯ k get pods -n knative-eventing ─╯
NAME READY STATUS RESTARTS AGE
eventing-controller-5ff747b589-7994j 1/1 Running 0 13m
eventing-webhook-b9c6cb4f-hfm7p 1/1 Running 0 13m
eventing-webhook-b9c6cb4f-thjk4 1/1 Running 0 13m
eventing-webhook-b9c6cb4f-zgbh5 1/1 Running 0 13m
kafka-ch-controller-77cb8cf758-pv5sn 1/1 Running 0 91s
kafka-webhook-9c549589b-ghmkb 1/1 Running 0 91s
Create a channel, subscription, a logger pod (event-display) and an event source (ping-source):
# Create a channel
❯ cat <<-EOF | kubectl apply -f -
---
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
metadata:
name: kafka-channel
spec: {}
EOF
# Create an event-display service, which will write the events it receives to its console
❯ cat <<-EOF | kubectl apply -f -
---
apiVersion: v1
kind: Service
metadata:
name: event-display
spec:
ports:
- port: 80
protocol: TCP
targetPort: 8080
selector:
run: event-display
---
apiVersion: v1
kind: Pod
metadata:
labels:
run: event-display
name: event-display
spec:
containers:
- image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
name: event-display
ports:
- containerPort: 8080
resources: {}
dnsPolicy: ClusterFirst
restartPolicy: Always
EOF
# Create a subscription for the channel
❯ cat <<-EOF | kubectl apply -f -
apiVersion: messaging.knative.dev/v1
kind: Subscription
metadata:
name: subscription
spec:
channel:
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
name: kafka-channel
subscriber:
ref:
apiVersion: v1
kind: Service
name: event-display
EOF
# Create an event source (ping source) that sends events to the channel
❯ cat <<-EOF | kubectl apply -f -
apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
name: test-ping-source
spec:
schedule: "*/1 * * * *"
contentType: "application/json"
data: '{"message": "Hello world!"}'
sink:
ref:
apiVersion: messaging.knative.dev/v1beta1
kind: KafkaChannel
name: kafka-channel
EOF
# Watch the logs of the event-display pod
❯ kubectl logs event-display --follow
2022/06/03 09:30:48 Failed to read tracing config, using the no-op default: empty json tracing config
☁️ cloudevents.Event
Context Attributes,
specversion: 1.0
type: dev.knative.sources.ping
source: /apis/v1/namespaces/default/pingsources/test-ping-source
id: 2adc0860-c4dc-43ac-88b5-c15e1eaebb91
time: 2022-06-03T09:34:00.060054762Z
datacontenttype: application/json
Data,
{
"message": "Hello world!"
}
Apply new channel manifest (applying the whole Knative Kafka suite with broker, sink, source and channel):
❯ kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.4.2/eventing-kafka.yaml
Apply post-install manifest:
❯ kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.4.2/eventing-kafka-post-install.yaml
Watch the logs of the post-install job:
❯ kubectl logs -n knative-eventing kafka-controller-post-install-5ds4v --follow
{"severity":"INFO","timestamp":"2022-06-03T11:10:46.640412574Z","caller":"logging/config.go:116","message":"Successfully created the logger."}
{"severity":"INFO","timestamp":"2022-06-03T11:10:46.640508022Z","caller":"logging/config.go:117","message":"Logging level set to: info"}
{"severity":"INFO","timestamp":"2022-06-03T11:10:46.640548813Z","caller":"logging/config.go:79","message":"Fetch GitHub commit ID from kodata failed","error":"open /var/run/ko/HEAD: no such file or directory"}
{"level":"info","ts":1654254646.771741,"logger":"fallback","caller":"post-install/kafka_channel_migrator.go:68","msg":"Waiting 10m0s for the new data plane to become ready before the migration."}
{"level":"info","ts":1654254646.7808318,"logger":"fallback","caller":"post-install/kafka_channel_migrator.go:76","msg":"New data plane is ready, progressing with the migration"}
{"level":"info","ts":1654254646.7809098,"logger":"fallback","caller":"post-install/kafka_channel_migrator.go:93","msg":"Starting migration of channel services to new dispatcher service: kafka-channel-ingress.knative-eventing.svc.cluster.local."}
{"level":"info","ts":1654254646.7837079,"logger":"fallback","caller":"post-install/kafka_channel_migrator.go:138","msg":"Patching service default/kafka-channel-kn-channel with the patch: [{\"op\":\"replace\", \"path\": \"/spec/externalName\", \"value\": \"kafka-channel-ingress.knative-eventing.svc.cluster.local\"}]."}
{"level":"info","ts":1654254646.7915254,"logger":"fallback","caller":"post-install/kafka_channel_migrator.go:193","msg":"Migrating configmap."}
{"level":"info","ts":1654254646.794911,"logger":"fallback","caller":"post-install/kafka_channel_migrator.go:225","msg":"Patching configmap kafka-channel-config with patch [{\"op\":\"replace\", \"path\": \"/data/bootstrap.servers\", \"value\": \"my-cluster-kafka-bootstrap.kafka:9092\"} {\"op\":\"replace\", \"path\": \"/data/auth.secret.ref.namespace\", \"value\": \"\"} {\"op\":\"replace\", \"path\": \"/data/auth.secret.ref.name\", \"value\": \"\"}]"}
{"level":"info","ts":1654254646.8132904,"logger":"fallback","caller":"post-install/kafka_channel_post_migration_deleter.go:54","msg":"Waiting 10m0s for the new control plane to become ready before the migration."}
{"level":"info","ts":1654254646.859228,"logger":"fallback","caller":"post-install/kafka_channel_post_migration_deleter.go:62","msg":"New control plane is ready, waiting 10m0s before deleting old data plane"}
{"level":"info","ts":1654255246.859558,"logger":"fallback","caller":"post-install/kafka_channel_post_migration_deleter.go:64","msg":"Done waiting 10m0s. Deleting old data plane..."}
Configmap config-kafka-channel-data-plane
contains 4 different keys to configure dataplane defaults.
config-kafka-channel-producer.properties
: Passed to underlying Vert.x Kafka Client when creating a Kafka producerconfig-kafka-channel-consumer.properties
: Passed to underlying Vert.x Kafka Client when creating a Kafka consumerconfig-kafka-channel-webclient.properties
: Passed to underlying Vert.x Web Client when creating a web client that does the http requests to subscribersconfig-kafka-channel-httpserver.properties
: Passed to Vert.x core when creating an HTTP server for the ingress
Values of these keys are in .properties format.
You may find the default value of this configmap in the repository.
If you are using Knative MTChannelBasedBroker
Broker
backed by a KafkaChannel, it is strongly recommended to use KafkaBroker instead.
This will save you additional HTTP hops, from channel to broker and broker to channel.
- Namespace dispatchers are not supported at the moment.