Listens on https port 7153
and fanout messages from messages
topic in Kafka.
- bash, jq, nc, grpcurl, protoc
- Kubernetes (e.g. Docker Desktop with Kubernetes enabled)
- kubectl
- helm 3.0+
The setup.sh
script:
- installs Zilla and Kafka to the Kubernetes cluster with helm and waits for the pods to start up
- creates the
messages
topic in Kafka. - starts port forwarding
./setup.sh
output:
+ ZILLA_CHART=oci://ghcr.io/aklivity/charts/zilla
+ helm upgrade --install zilla-grpc-kafka-fanout oci://ghcr.io/aklivity/charts/zilla --namespace zilla-grpc-kafka-fanout --create-namespace --wait [...]
NAME: zilla-grpc-kafka-fanout
LAST DEPLOYED: [...]
NAMESPACE: zilla-grpc-kafka-fanout
STATUS: deployed
REVISION: 1
NOTES:
Zilla has been installed.
[...]
+ helm upgrade --install zilla-grpc-kafka-fanout-kafka chart --namespace zilla-grpc-kafka-fanout --create-namespace --wait
NAME: zilla-grpc-kafka-fanout-kafka
LAST DEPLOYED: [...]
NAMESPACE: zilla-grpc-kafka-fanout
STATUS: deployed
REVISION: 1
TEST SUITE: None
++ kubectl get pods --namespace zilla-grpc-kafka-fanout --selector app.kubernetes.io/instance=kafka -o name
+ KAFKA_POD=pod/kafka-969789cc9-mxd98
+ kubectl exec --namespace zilla-grpc-kafka-fanout pod/kafka-969789cc9-mxd98 -- /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic messages --if-not-exists
Created topic messages.
+ kubectl port-forward --namespace zilla-grpc-kafka-fanout service/zilla 7153
+ nc -z localhost 7153
+ kubectl port-forward --namespace zilla-grpc-kafka-fanout service/kafka 9092 29092
+ sleep 1
+ nc -z localhost 7153
Connection to localhost port 7153 [tcp/websm] succeeded!
+ nc -z localhost 9092
Connection to localhost port 9092 [tcp/XmlIpcRegSvc] succeeded!
Prepare protobuf message to send to Kafka topic.
echo 'message: "test"' | protoc --encode=example.FanoutMessage proto/fanout.proto > binary.data
Produce protobuf message to Kafka topic, repeat to produce multiple messages.
kcat -P -b localhost:9092 -t messages -k -e ./binary.data
Stream messages via server streaming rpc.
grpcurl -insecure -proto proto/fanout.proto -d '' localhost:7153 example.FanoutService.FanoutServerStream
output:
{
"message": "test"
}
This output repeats for each message produced to Kafka.
Build the reliable streaming client which uses 32767
field as last message id to send as metadata to resume streaming from last received message.
cd grpc.reliable.streaming/
./mvnw clean install
cd ..
Connect with the reliable streaming client.
java -jar grpc.reliable.streaming/target/grpc-example-develop-SNAPSHOT-jar-with-dependencies.jar
output:
...
INFO: Found message: message: "test"
32767: "\001\002\000\002"
Simulate connection loss by stopping the zilla
service in the docker
stack.
kubectl scale --replicas=0 --namespace zilla-grpc-kafka-fanout deployment/zilla
Simulate connection recovery by starting the zilla
service again.
kubectl scale --replicas=1 --namespace zilla-grpc-kafka-fanout deployment/zilla
Now you need to restart the port-forward.
kubectl port-forward --namespace zilla-grpc-kafka-fanout service/zilla 7153 > /tmp/kubectl-zilla.log 2>&1 &
Then produce another protobuf message to Kafka, repeat to produce multiple messages.
kcat -P -b localhost:9092 -t messages -k -e ./binary.data
The reliable streaming client will recover and zilla deliver only the new message.
...
INFO: Found message: message: "test"
32767: "\001\002\000\004"
This output repeats for each message produced to Kafka after the zilla service is restart.
The teardown.sh
script stops port forwarding, uninstalls Zilla and deletes the namespace.
./teardown.sh
output:
+ pgrep kubectl
99998
99999
+ killall kubectl
+ helm uninstall zilla-grpc-kafka-fanout zilla-grpc-kafka-fanout-kafka --namespace zilla-grpc-kafka-fanout
release "zilla-grpc-kafka-fanout" uninstalled
release "zilla-grpc-kafka-fanout-kafka" uninstalled
+ kubectl delete namespace zilla-grpc-fanout
namespace "zilla-grpc-kafka-fanout" deleted