This tutorial guides you through the process of setting up and running a Snowflake JDBC Source Connector for Confluent Cloud. This allows you to stream data from your Snowflake database into a Kafka topic on Confluent Cloud using the JDBC protocol.
Before starting this tutorial, you should have the following:
- Basic understanding of Snowflake, JDBC, Confluent Cloud, and Kubernetes.
- Access to a Snowflake warehouse/database.
- Access to a Confluent Cloud Kafka cluster.
- A Kubernetes cluster set up and ready to use.
- Confluent for Kubernetes installed on the Kubernetes cluster.
- Knowledge of how to work with YAML files.
- The following tools installed:
First, you need to download two important files:
- The Confluent JDBC Connector. You can download it from the Confluent Hub.
- The Snowflake JDBC driver. Download it from Maven Repository.
This step involves modifying the Confluent JDBC Connector to include the Snowflake JDBC driver. Here's how you do it:
- Extract the Confluent JDBC Connector zip file and navigate to the
lib
folder. - Copy the Snowflake JDBC driver JAR (
snowflake-jdbc-3.13.16.jar
) and paste it into thislib
folder. - Compress the entire folder as a zip file - just as it was before you extracted it before. You now have a JDBC Connector plugin with the added Snowflake JDBC driver.
- Note the SHA512 checksum of this newly created file (you will need this later).
shasum -a 512 '<your new zip file>'
- Upload the new zip file to a location that can be accessed from your Kubernetes cluster where Kafka Connect is deployed. For example, you could use Amazon S3 or Google Cloud Storage.
- Alternatively, you may use the preconfigured zip file already uploaded here. Please note that this zip file contains a pre-configured version of the JDBC Source Connector with the Snowflake JDBC driver already added.
- To create an API key, log on to Confluent Cloud, navigate to your cluster, and create an API key. Make sure to download the key for later reference.
In this step, we'll create a Kubernetes secret to store the API key.
- Create a file named
plain.txt
and enter the API key and secret in the following format (replaceYOUR_API_KEY
andYOUR_SECRET
with your actual API key and secret):
username=YOUR_API_KEY
password=YOUR_SECRET
- Using
kubectl
, create a Kubernetes secret for the API key with the file you just created. This command will create a Kubernetes secret namedccloud-credentials
, which will be used later for authentication.
kubectl create secret generic ccloud-credentials --from-file=plain.txt
-
Ensure that the Confluent for Kubernetes operator is installed on your Kubernetes cluster. For more information, refer to the Confluent for Kubernetes Quickstart.
-
Create a new YAML file for Connect, as shown below. Note the following:
- The
build
section references the URL and SHA512 checksum for the modified JDBC connector you previously created. Replace thearchivePath
with the actual URL of your modified JDBC connector.- Alternatively, you may use the preconfigured zip file referenced in the
archivePath
andchecksum
values below. Please note that this zip file contains a pre-configured version of the JDBC Source Connector with the Snowflake JDBC driver already added.
- Alternatively, you may use the preconfigured zip file referenced in the
- The
dependencies
section contains the bootstrap URL (as found in Cluster Settings at https://confluent.cloud/), the secret containing the API key, and TLS settings for Connect to communicate with Confluent Cloud using the pre-installed TLS certificates in the image.
- The
apiVersion: platform.confluent.io/v1beta1
kind: Connect
metadata:
name: connect
namespace: confluent
spec:
replicas: 1
image:
application: confluentinc/cp-server-connect:7.4.0
init: confluentinc/confluent-init-container:2.6.0
build:
type: onDemand
onDemand:
plugins:
locationType: url
url:
- name: "kafka-connect-jdbc"
archivePath: "https://gtrout-public.s3.us-east-2.amazonaws.com/confluentinc-kafka-connect-jdbc-snowflake-10.7.3.zip"
checksum: "82cf0fe99a88318f56e37636da94af8ae24df0e89fc7e4c7c21bd459fb226a9bb95942db5a744e499ae1cd4bdf87e679a4976480a71f4e0c423ce9d88f28aa4f"
dependencies:
kafka:
bootstrapEndpoint: "pkc-xxxxx.us-east-2.aws.confluent.cloud:9092"
authentication:
type: plain
jaasConfig:
secretRef: "ccloud-credentials"
tls:
enabled: true
ignoreTrustStoreConfig: true
- Apply the manifest to create the Connect cluster by running the following command:
kubectl apply -f connect.yaml
At this point, you have a Connect cluster running and connected to your Confluent Cloud cluster. It has the Snowflake JDBC Source Connector plugin installed, which means a connector of this type can be deployed. The next step is to configure the Snowflake JDBC Source connector, referencing all the relevant Snowflake configurations such as connect.url
, warehouse
, and others. If needed, you can refer to Snowflake's JDBC Driver Connection Parameter Reference page for more details.
- Create a new YAML file for the Connector, as shown below:
apiVersion: platform.confluent.io/v1beta1
kind: Connector
metadata:
name: snowflake-jdbc-source
namespace: confluent
spec:
class: "io.confluent.connect.jdbc.JdbcSourceConnector"
taskMax: 1
connectClusterRef:
name: connect
configs:
connection.url: "jdbc:snowflake://xxxxxxx-xxxxxxx.snowflakecomputing.com/?warehouse=EXAMPLEWAREHOUSE&db=EXAMPLEDB&role=PUBLIC&schema=PUBLIC&user=USER123&password=password&tracing=ALL"
table.whitelist: "FOO"
mode: "timestamp+incrementing"
timestamp.column.name: "UPDATE_TS"
incrementing.column.name: "ID"
topic.prefix: "snowflake-"
validate.non.null: "false"
errors.log.enable: "true"
errors.log.include.messages: "true"
- Apply the manifest to create the connector:
kubectl apply -f connector.yaml
Remember to replace the placeholders in the connection.url
field with your actual Snowflake details. The table.whitelist
field should contain the name of the table you want to use. The timestamp.column.name
and incrementing.column.name
fields should contain the names of the timestamp and incrementing columns, respectively. The topic.prefix
field will be used as a prefix for the names of the topics where the data will be published. The errors.log.enable
and errors.log.include.messages
fields are used to enable error logging and to include error messages in the logs, respectively.
Confluent Control Center provides a self-hosted Web UI for monitoring and/or managing your Connect cluster and other Confluent components, including Confluent Cloud Kafka clusters.
If you wish to set it up:
- Create a new YAML file for Control Center, as shown below.
- The
kafka
dependency can be removed if you only need Control Center to manage the Connect cluster. In this example, the dependency is included and it uses the same API key/secret that was used by the Connect cluster. - The
externalAccess
section is only needed if you want to use a load balancer to access Control Center externally. If you want to get started quickly or if your Kubernetes cluster is not configured to provisionLoadBalancer
services, you can remove this entire section. Instead, you can access Control Center atlocalhost:9021
in your local browser by running the following command:kubectl port-forward controlcenter-0 9021:9021
- Replace
your-domain.net
with the domain where your Kubernetes cluster is running. You might be able to enter any value here and still access Control Center using the external IP address assigned to the Control Center service. To retrieve this IP address, you can execute the following command:
kubectl get service controlcenter-bootstrap-lb
- Replace
- The
apiVersion: platform.confluent.io/v1beta1
kind: ControlCenter
metadata:
name: controlcenter
namespace: confluent
spec:
replicas: 1
image:
application: confluentinc/cp-enterprise-control-center:7.4.0
init: confluentinc/confluent-init-container:2.6.0
dataVolumeCapacity: 10Gi
configOverrides:
server:
- confluent.metrics.topic.max.message.bytes=8388608
dependencies:
kafka:
bootstrapEndpoint: "pkc-xxxxx.us-east-2.aws.confluent.cloud:9092"
authentication:
type: plain
jaasConfig:
secretRef: "ccloud-credentials"
tls:
enabled: true
ignoreTrustStoreConfig: true
connect:
- name: connect
url: http://connect.confluent.svc.cluster.local:8083
externalAccess:
type: loadBalancer
loadBalancer:
domain: your-domain.net
When troubleshooting, ensure that your kubectl context is set to the confluent
namespace (or the namespace where your resources are deployed). To set the namespace for all subsequent kubectl commands in your context permanently, run the following command:
kubectl config set-context --current --namespace=confluent
To check the status of all Confluent custom resources, use the following command:
kubectl get confluent
To get detailed descriptions of the Connect, Connector, or ControlCenter custom resources, use the following commands:
kubectl describe connect connect
kubectl describe connector snowflake-jdbc-source
kubectl describe controlcenter controlcenter
Note: The names of the Connect, Connector, and ControlCenter resources are connect, snowflake-jdbc-source, and controlcenter, respectively.
To view the logs of the Confluent for Kubernetes pod, Connect pod, ControlCenter pod, and Connect init container's pod, use the following commands:
Confluent for Kubernetes pod logs:
kubectl logs confluent-operator-xxxxxxxxxx-xxxxx
Connect pod logs:
kubectl logs connect-0
ControlCenter pod logs:
kubectl logs controlcenter-0
Connect init container's pod logs (useful if the connector plugin is not available):
kubectl logs connect-0 -c config-init-container
To view all events in the namespace, sorted by timestamp, use the following command:
kubectl get events --sort-by=.metadata.creationTimestamp