Skip to content

Latest commit

 

History

History
219 lines (166 loc) · 15.3 KB

HOWTO_run_it_on_gce.adoc

File metadata and controls

219 lines (166 loc) · 15.3 KB

Cloud all the things! Runnning Schema Registry, Kafka Connect, and KSQL on GCE connecting to CCloud

Overview

This shows how to run on GCE the Confluent Platform components that are not yet in CCloud. It’s done using Docker on VMs, and GCP’s new create-with-container API. You could do it in VMs running Docker manually if you want, or maybe with k8s but that’s a step beyond me…

The assumption is you have a CCloud broker endpoint and API credentials already.

Setup

  • Search and replace CCLOUD_USERNAME with your CCloud API user

  • Search and replace CCLOUD_PASSWORD with your CCloud API password

  • Search and replace CCLOUD_BROKER_HOST with your CCloud broker address

  • Search and replace GCP_PROJECT_NAME with your GCP project id

_TODO: figure out appropriate bash so as to be able to set the above as environment variables passed into the instance launch commands :D _

Set this to a unique value each time, otherwise you’ll end up with clashes on internal Kafka topics on your cluster—which is bad, m’kay?

export DEPLOYMENT_VERSION=x03

Firewall

gcloud compute --project=GCP_PROJECT_NAME firewall-rules create allow-connect-connect-rest --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:8083 --source-ranges=0.0.0.0/0 --target-tags=kafka-connect
gcloud compute --project=GCP_PROJECT_NAME firewall-rules create allow-connect-ksql-server --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:8088 --source-ranges=0.0.0.0/0 --target-tags=ksql-server
gcloud compute --project=GCP_PROJECT_NAME firewall-rules create allow-connect-schema-registry --direction=INGRESS --priority=1000 --network=default --action=ALLOW --rules=tcp:8081 --source-ranges=0.0.0.0/0 --target-tags=schema-registry

Schema Registry

gcloud beta compute instances create-with-container rmoff-schema-registry-$DEPLOYMENT_VERSION \
        --zone=us-east1-b \
        --tags schema-registry \
        --container-image confluentinc/cp-schema-registry:5.0.0 \
        --container-env=SCHEMA_REGISTRY_HOST_NAME=localhost,SCHEMA_REGISTRY_LISTENERS=http://0.0.0.0:8081,SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=SASL_SSL://CCLOUD_BROKER_HOST:9092,SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL=SASL_SSL,SCHEMA_REGISTRY_KAFKASTORE_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\;,SCHEMA_REGISTRY_KAFKASTORE_SASL_MECHANISM=PLAIN,SCHEMA_REGISTRY_LOG4J_ROOT_LOGLEVEL=INFO \
        --container-restart-policy=never

Take the external IP of the instance created, and set it as an environment variable:

export SCHEMA_REGISTRY_HOST=xx.xx.xx.xx

KSQL Server

gcloud beta compute instances create-with-container rmoff-ksql-server-$DEPLOYMENT_VERSION \
        --zone=us-east1-b \
        --tags ksql-server \
        --container-image confluentinc/cp-ksql-server:5.0.0 \
        --container-env=KSQL_BOOTSTRAP_SERVERS=CCLOUD_BROKER_HOST:9092,KSQL_KSQL_SCHEMA_REGISTRY_URL=http://$SCHEMA_REGISTRY_HOST:8081,KSQL_KSQL_SERVER_UI_ENABLED=false,KSQL_APPLICATION_ID=rmoff-gcp-pipeline-demo-$DEPLOYMENT_VERSION,KSQL_KSQL_STREAMS_REPLICATION_FACTOR=3,KSQL_KSQL_SINK_REPLICAS=3,KSQL_LISTENERS=http://0.0.0.0:8088,KSQL_CACHE_MAX_BYTES_BUFFERING=0,KSQL_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=HTTPS,KSQL_SECURITY_PROTOCOL=SASL_SSL,KSQL_SASL_MECHANISM=PLAIN,KSQL_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\; \
        --container-restart-policy=never

Take the external IP of the instance created, and set it as an environment variable:

export KSQL_SERVER_HOST=xx.xx.xx.xx

Kafka Connect instance with REST source connector

gcloud beta compute instances create-with-container rmoff-connect-source-$DEPLOYMENT_VERSION \
        --zone=us-east1-b \
        --tags kafka-connect \
        --container-image confluentinc/cp-kafka-connect:5.0.0 \
        --container-env=^±^CONNECT_BOOTSTRAP_SERVERS=CCLOUD_BROKER_HOST:9092±CONNECT_REST_PORT=8083±CONNECT_GROUP_ID=compose-connect-group-source-$DEPLOYMENT_VERSION±CONNECT_CONFIG_STORAGE_TOPIC=docker-connect-configs-source-$DEPLOYMENT_VERSION±CONNECT_OFFSET_STORAGE_TOPIC=docker-connect-offsets-source-$DEPLOYMENT_VERSION±CONNECT_STATUS_STORAGE_TOPIC=docker-connect-status-source-$DEPLOYMENT_VERSION±CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_REST_ADVERTISED_HOST_NAME=localhost±CONNECT_LOG4J_ROOT_LOGLEVEL=INFO±CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR±CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3±CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3±CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3±CONNECT_PLUGIN_PATH=/usr/share/java,/u01/connectors/,/usr/share/confluent-hub-components±CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_SASL_MECHANISM=PLAIN±CONNECT_REQUEST_TIMEOUT_MS=20000±CONNECT_RETRY_BACKOFF_MS=500±CONNECT_SECURITY_PROTOCOL=SASL_SSL±CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_CONSUMER_SASL_MECHANISM=PLAIN±CONNECT_CONSUMER_REQUEST_TIMEOUT_MS=20000±CONNECT_CONSUMER_RETRY_BACKOFF_MS=500±CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL±CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_PRODUCER_SASL_MECHANISM=PLAIN±CONNECT_PRODUCER_REQUEST_TIMEOUT_MS=20000±CONNECT_PRODUCER_RETRY_BACKOFF_MS=500±CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL±CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\;±CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME"\ password=\"CCLOUD_PASSWORD\"\;±CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME"\ password=\"CCLOUD_PASSWORD\"\; \
        --container-restart-policy=never \
        --container-command=/bin/bash \
        --container-arg=-c \
        --container-arg=echo\ Installing\ unzip...\ \&\&\ curl\ -so\ unzip.deb\ http://ftp.br.debian.org/debian/pool/main/u/unzip/unzip_6.0-16\+deb8u3_amd64.deb\ \&\&\ dpkg\ -i\ unzip.deb\ \&\&\ echo\ Downloading\ connector...\ \&\&\ curl\ -so\ kafka-connect-rest.zip\ https://storage.googleapis.com/rmoff-connectors/kafka-connect-rest.zip\ \&\&\ echo\ Making\ folder\ for\ connector...\ \&\&\ mkdir\ -p\ /u01/connectors/\ \&\&\ echo\ Unzipping\ connector...\ \&\&\ unzip\ -j\ kafka-connect-rest.zip\ -d\ /u01/connectors/kafka-connect-rest\ \&\&\ echo\ Launching\ Connect...\ \&\&\ /etc/confluent/docker/run

Take the external IP of the instance created, and set it as an environment variable:

export CONNECT_HOST_SOURCE=xx.xx.xx.xx

Kafka Connect with GCS and GBQ sink connectors

Note that this stuff the GCP creds into the container-arg. You’ll need to replace them with your own.

gcloud beta compute instances create-with-container rmoff-connect-gbq-gcs-$DEPLOYMENT_VERSION \
        --zone=us-east1-b \
        --tags kafka-connect \
        --container-image confluentinc/cp-kafka-connect:5.0.0 \
        --container-env=^±^CONNECT_BOOTSTRAP_SERVERS=CCLOUD_BROKER_HOST:9092±CONNECT_REST_PORT=8083±CONNECT_GROUP_ID=compose-connect-group-gbq-gcs-$DEPLOYMENT_VERSION±CONNECT_CONFIG_STORAGE_TOPIC=docker-connect-configs-gbq-gcs-$DEPLOYMENT_VERSION±CONNECT_OFFSET_STORAGE_TOPIC=docker-connect-offsets-gbq-gcs-$DEPLOYMENT_VERSION±CONNECT_STATUS_STORAGE_TOPIC=docker-connect-status-gbq-gcs-$DEPLOYMENT_VERSION±CONNECT_INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter±CONNECT_REST_ADVERTISED_HOST_NAME=localhost±CONNECT_LOG4J_ROOT_LOGLEVEL=INFO±CONNECT_LOG4J_LOGGERS=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR±CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=3±CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=3±CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=3±CONNECT_PLUGIN_PATH=/usr/share/java,/u01/connectors/,/usr/share/confluent-hub-components±CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_SASL_MECHANISM=PLAIN±CONNECT_REQUEST_TIMEOUT_MS=20000±CONNECT_RETRY_BACKOFF_MS=500±CONNECT_SECURITY_PROTOCOL=SASL_SSL±CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_CONSUMER_SASL_MECHANISM=PLAIN±CONNECT_CONSUMER_REQUEST_TIMEOUT_MS=20000±CONNECT_CONSUMER_RETRY_BACKOFF_MS=500±CONNECT_CONSUMER_SECURITY_PROTOCOL=SASL_SSL±CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=https±CONNECT_PRODUCER_SASL_MECHANISM=PLAIN±CONNECT_PRODUCER_REQUEST_TIMEOUT_MS=20000±CONNECT_PRODUCER_RETRY_BACKOFF_MS=500±CONNECT_PRODUCER_SECURITY_PROTOCOL=SASL_SSL±CONNECT_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\;±CONNECT_CONSUMER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\;±CONNECT_PRODUCER_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule\ required\ username=\"CCLOUD_USERNAME\"\ password=\"CCLOUD_PASSWORD\"\; \
        --container-restart-policy=never \
        --container-command=/bin/bash \
        --container-arg=-c \
        --container-arg=echo\ \'\{\ \ \ \"type\":\ \"service_account\",\ \ \ \"project_id\":\ \"GCP_PROJECT_NAME\",\ \ \ \"private_key_id\":\ \"GCP_PRIVATE_KEY_ID\",\ \ \"private_key\":\ \"-----BEGIN\ PRIVATE\ KEY-----\\nGCP_PRIVATE_KEY\\n-----END\ PRIVATE\ KEY-----\\n\",\ \ \ \"client_email\":\ \"GCP_USER@GCP_PROJECT_NAME.iam.gserviceaccount.com\",\ \ \ \"client_id\":\ \"GCP_USER_ID\",\ \ \ \"auth_uri\":\ \"https://accounts.google.com/o/oauth2/auth\",\ \ \ \"token_uri\":\ \"https://accounts.google.com/o/oauth2/token\",\ \ \ \"auth_provider_x509_cert_url\":\ \"https://www.googleapis.com/oauth2/v1/certs\",\ \ \ \"client_x509_cert_url\":\ \"https://www.googleapis.com/robot/v1/metadata/x509/rmoff-gcs\%40GCP_PROJECT_NAME.iam.gserviceaccount.com\"\ \}\'\ \>\ /root/creds/gcp_creds.json\ \&\&\ confluent-hub\ install\ --no-prompt\ confluentinc/kafka-connect-gcs:5.0.0\ \&\&\ confluent-hub\ install\ --no-prompt\ wepay/kafka-connect-bigquery:1.1.0\ \&\&\ /etc/confluent/docker/run

Take the external IP of the instance created, and set it as an environment variable:

export CONNECT_HOST_SINK=xx.xx.xx.xx

Healthcheck

Kafka Connect source

export HOST=$CONNECT_HOST_SOURCE
export PORT=8083
export ENDPOINT=connectors
bash -c 'echo "Waiting for host to start listening on $HOST ⏳ ";while [ $(curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT) -eq 000 ];do curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT;date;sleep 5;done;nc -vz $HOST $PORT'

Kafka Connect sink

export HOST=$CONNECT_HOST_SINK
export PORT=8083
export ENDPOINT=connectors
bash -c 'echo "Waiting for host to start listening on $HOST ⏳ ";while [ $(curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT) -eq 000 ];do curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT;date;sleep 5;done;nc -vz $HOST $PORT'

Schema Registry

export HOST=$SCHEMA_REGISTRY_HOST
export PORT=8081
export ENDPOINT=
bash -c 'echo "Waiting for host to start listening on $HOST ⏳ ";while [ $(curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT) -eq 000 ];do curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT;date;sleep 5;done;nc -vz $HOST $PORT'

KSQL Server

export HOST=$KSQL_SERVER_HOST
export PORT=8088
export ENDPOINT=
bash -c 'echo "Waiting for host to start listening on $HOST ⏳ ";while [ $(curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT) -eq 000 ];do curl -s -o /dev/null -w "%{http_code}" http://$HOST:$PORT/$ENDPOINT;date;sleep 5;done;nc -vz $HOST $PORT'

Connect to KSQL Server from CLI:

docker run --rm --interactive --tty confluentinc/cp-ksql-cli:5.0.0 http://$KSQL_SERVER_HOST:8088

Footnotes

Accessing log data from GCE instances and containers running within

Get stdout:

gcloud compute instances get-serial-port-output my-vm-name-goes-here

Docker output:

$ gcloud logging read "jsonPayload.container.name=/my-vm-name-goes-here AND logName=projects/my-project-name/logs/gcplogs-docker-driver" --format=json --order=asc --freshness=1h --limit=100|jq '.[] | .jsonPayload.data '

Passing Environment Variables to containers

The environment variables for Docker images are combined into a single --container-env argument, and the default comma-separation is overriden to use ± instead (±) since commas are required as some of the environment values. See docs for more details.

Passing GCP credentials to a container

To write to GCS or GBQ the Docker container needs access to GCP credentials, which are a JSON file from GCP. Options considered:

  1. Bake into Docker image

    • Yuck

  2. Put on GCS and pull into container

    • Very unsecure if public ACL

    • If not public ACL, then how do you auth the container to pull them? VM Scope would work for the host, but then you’d need to pull them from the host and pass to the container

  3. VM Scope / service account

    • Doesn’t look like these are inherited by the container

  4. Embed in the container startup arguments

    • Messy but works. The GCP Web UI console doesn’t permit container argument strings > 2049 (weird huh) but works fine passed through CLI:

      # POC for getting creds into continer
      gcloud beta compute --project=GCP_PROJECT_NAME instances create-with-container cli-test-18 --zone=us-east1-b \
      --container-image=confluentinc/cp-kafka-connect:5.0.0 \
      --container-restart-policy=never \
      --container-command=/bin/bash \
      --container-arg=-c \
      --container-arg=echo\ \'\{\ \ \ \"type\":\ \"service_account\",\ \ \ \"project_id\":\ \"GCP_PROJECT_NAME\",\ \ \ \"private_key_id\":\ \"GCP_PRIVATE_KEY_ID\",\ \ \"private_key\":\ \"-----BEGIN\ PRIVATE\ KEY-----\\nGCP_PRIVATE_KEY\\n-----END\ PRIVATE\ KEY-----\\n\",\ \ \ \"client_email\":\ \"GCP_USER@GCP_PROJECT_NAME.iam.gserviceaccount.com\",\ \ \ \"client_id\":\ \"GCP_USER_ID\",\ \ \ \"auth_uri\":\ \"https://accounts.google.com/o/oauth2/auth\",\ \ \ \"token_uri\":\ \"https://accounts.google.com/o/oauth2/token\",\ \ \ \"auth_provider_x509_cert_url\":\ \"https://www.googleapis.com/oauth2/v1/certs\",\ \ \ \"client_x509_cert_url\":\ \"https://www.googleapis.com/robot/v1/metadata/x509/GCP_USER\%40GCP_PROJECT_NAME.iam.gserviceaccount.com\"\ \}\'\ \>\ /tmp/gcp_creds.json\ \&\&\ cat\ /tmp/gcp_creds.json\ \&\&\ sleep\ 6000