This project provides simple templates and instructions to build Apache Pulsar connectors on the base of existing Apache Kafka connectors.
Apache Pulsar's current acceptance criteria for connectors requires a developer brave and experienced enough with both Pulsar and third party systems to contribute the connector and required integration tests.
This project relaxes the criteria to allow developers to quickly move connectors they used with their Apache Kafka infrastructure into Apache Pulsar's, and incrementally work on improvements.
The project uses Apache Pulsar's Kafka Connect Adaptor (KCA). More information about KCA is available in this blog post. KCA is used for such popular Pulsar connectors as Pulsar Debezium Source Connectors and Pulsar Snowflake Sink Connector.
The connectors built with this project require Datastax Pulsar Luna 2.8+ or Apache Pulsar 2.9+.
For the details of the status of the specific connector and available connectors, navigate to
pulsar-connector/<connector name>
and check the readme provided by the contributor.
Added connectors, so far:
- Azure Data Explorer (Kusto)
- Azure DocumentDB
- Apache Geode
- Apache Kudu
- Apache Phoenix
- Apache PLC4X
- CoAP
- Couchbase
- DataDog Logs
- Diffusion
- Google BigQuery
- Hazelcast Jet
- Humio HEC
- JMS
- Kinetica
- MarkLogic
- MQTT
- Neo4J
- New Relic
- OrientDB
- Redis
- SAP HANA
- SingleStore
- Splunk
- XTDB
- Zeebe
- camel-aws-cloudwatch-sink
- camel-aws-ddb-sink
- camel-aws-ddb-streams-source
- camel-aws-ec2-sink
- camel-aws-eventbridge-sink
- camel-aws-kinesis-firehose-sink
- camel-aws-kinesis-sink
- camel-aws-kinesis-source
- camel-aws-lambda-sink
- camel-aws-redshift-sink
- camel-aws-redshift-source
- camel-aws-s3-sink
- camel-aws-s3-source
- camel-aws-s3-streaming-upload-sink
- camel-aws-secrets-manager-sink
- camel-aws-ses-sink
- camel-aws-sns-fifo-sink
- camel-aws-sns-sink
- camel-aws-sqs-batch-sink
- camel-aws-sqs-fifo-sink
- camel-aws-sqs-sink
- camel-aws-sqs-source
- camel-aws2-iam
- camel-aws2-kms
- camel-azure-cosmosdb-source
- camel-azure-eventhubs-sink
- camel-azure-eventhubs-source
- camel-azure-functions-sink
- camel-azure-servicebus-sink
- camel-azure-servicebus-source
- camel-azure-storage-blob-changefeed-source
- camel-azure-storage-blob-sink
- camel-azure-storage-blob-source
- camel-azure-storage-queue-sink
- camel-azure-storage-queue-source
- camel-beer-source
- camel-bitcoin-source
- camel-cassandra-sink
- camel-cassandra-source
- camel-ceph-sink
- camel-ceph-source
- camel-chuck-norris-source
- camel-couchbase-sink
- camel-cron-source
- camel-cxf
- camel-cxfrs
- camel-dropbox-sink
- camel-dropbox-source
- camel-earthquake-source
- camel-elasticsearch-index-sink
- camel-elasticsearch-search-source
- camel-exec-sink
- camel-fhir-source
- camel-file
- camel-file-watch-source
- camel-ftp-sink
- camel-ftp-source
- camel-ftps-sink
- camel-ftps-source
- camel-github-commit-source
- camel-github-event-source
- camel-github-pullrequest-comment-source
- camel-github-pullrequest-source
- camel-github-tag-source
- camel-google-bigquery-sink
- camel-google-calendar-source
- camel-google-functions-sink
- camel-google-mail-source
- camel-google-pubsub-sink
- camel-google-pubsub-source
- camel-google-sheets-source
- camel-google-storage-sink
- camel-google-storage-source
- camel-hdfs
- camel-http-secured-sink
- camel-http-secured-source
- camel-http-sink
- camel-http-source
- camel-https
- camel-infinispan-sink
- camel-infinispan-source
- camel-jdbc
- camel-jira-add-comment-sink
- camel-jira-add-issue-sink
- camel-jira-oauth-source
- camel-jira-source
- camel-jira-transition-issue-sink
- camel-jira-update-issue-sink
- camel-jms-amqp-10-sink
- camel-jms-amqp-10-source
- camel-jms-apache-activemq-sink
- camel-jms-apache-activemq-source
- camel-jms-apache-artemis-sink
- camel-jms-apache-artemis-source
- camel-jms-ibm-mq-sink
- camel-jms-ibm-mq-source
- camel-kafka-not-secured-sink
- camel-kafka-not-secured-source
- camel-kafka-sink
- camel-kafka-source
- camel-kafka-ssl-sink
- camel-kafka-ssl-source
- camel-kubernetes-namespaces-source
- camel-kubernetes-nodes-source
- camel-kubernetes-pods-source
- camel-log-sink
- camel-mail-imap-source
- camel-mail-sink
- camel-mariadb-sink
- camel-mariadb-source
- camel-minio-sink
- camel-minio-source
- camel-mongodb-changes-stream-source
- camel-mongodb-sink
- camel-mongodb-source
- camel-mqtt-sink
- camel-mqtt-source
- camel-mqtt5-sink
- camel-mqtt5-source
- camel-mysql-sink
- camel-mysql-source
- camel-nats-sink
- camel-nats-source
- camel-netty-http
- camel-netty
- camel-oracle-database-sink
- camel-oracle-database-source
- camel-postgresql-sink
- camel-postgresql-source
- camel-pulsar-sink
- camel-pulsar-source
- camel-rabbitmq-source
- camel-redis-sink
- camel-redis-source
- camel-rest-openapi-sink
- camel-salesforce-create-sink
- camel-salesforce-delete-sink
- camel-salesforce-source
- camel-salesforce-update-sink
- camel-scp-sink
- camel-sftp-sink
- camel-sftp-source
- camel-sjms2
- camel-slack-sink
- camel-slack-source
- camel-solr-sink
- camel-solr-source
- camel-splunk-hec-sink
- camel-splunk-sink
- camel-splunk-source
- camel-sqlserver-sink
- camel-sqlserver-source
- camel-ssh-sink
- camel-ssh-source
- camel-syslog
- camel-telegram-sink
- camel-telegram-source
- camel-timer-source
- camel-twitter-directmessage-source
- camel-twitter-search-source
- camel-twitter-timeline-source
- camel-webhook-source
- camel-websocket-source
- camel-wttrin-source
The rest of this documentation will dive into details of:
- How to build connectors
- How to use connectors
- How to add a connector
Ensure you have JDK 11+ and Maven 3.8 installed.
Clone the connector's repo and run mvn clean install
from the root.
The connector's .nar
files can be found at pulsar-connectors/<connector name>>/target/pulsar-3rdparty-pulsar-connectors-<connector name>-0.1.0-SNAPSHOT.nar
Follow Pulsar's documentation to use the packaged connector.
Follow the example below to create a config yaml file:
# Pulsar KCA Sink expects "processingGuarantees" to be "EFFECTIVELY_ONCE"`
processingGuarantees: "EFFECTIVELY_ONCE"
configs:
# Size of messages in bytes the sink will attempt to batch messages together before flush.
# batchSize: 16384
# Time interval in milliseconds the sink will attempt to batch messages together before flush.
# lingerTimeMs: 2147483647
# In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record.
# unwrapKeyValueIfAvailable: "true"
# The Kafka topic name that passed to Kafka sink.
topic: "my-topic"
# Pulsar topic to store offsets at.
offsetStorageTopic: "kafka-connect-sink-offsets"
# A Kafka connector sink class to use.
kafkaConnectorSinkClass: "com.third.party.CoolSinkConnector"
# Config properties to pass to the Kafka connector.
kafkaConnectorConfigProperties:
# The following properties passed directly to Kafka Connect Sink and defined by it or by
# https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
name: "test-sink"
connector.class: "com.third.party.CoolSinkConnector"
tasks.max: "1"
topics: "my-topic"
...
Follow the example below to create a config yaml file:
tenant: "public"
namespace: "default"
name: "test-source"
topicName: "test-topic"
parallelism: 1
# A Kafka connector source class to use.
className: "com.third.party.CoolSourceConnector"
configs:
# Present the message only consist of payload.
# json-with-envelope: "false"
# Pulsar topic to store Kafka connector offsets at
offset.storage.topic: "kafka-connect-source-offsets"
# Pulsar namespace to store the output topics
topic.namespace: "public/default"
# Config properties to pass to the Kafka connector.
# The following properties passed directly to Kafka Connect Sink and defined by it or by
# https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
# A Kafka connector source class to use.
task.class: "com.third.party.CoolSourceConnector"
# The converter provided by Kafka Connect to convert record value.
value.converter: "org.apache.kafka.connect.json.JsonConverter"
# The converter provided by Kafka Connect to convert record key.
key.converter: "org.apache.kafka.connect.json.JsonConverter"
...
These steps help avoid some common problems encountered while using KCA to create a new connector:
- Ensure the connector's license allows its use and redistribution. A helpful starting point is here.
- Maven dependency conflict of transitive dependencies in build time.
- Dependency conflict in runtime caused by third-party dependencies packaged with the connector.
Check the content of the Kafka connector's jar file. If it includes third-party dependencies, you may need to "shade" it (rename some classes).
To do so, copy shaded-dependencies/template-shaded/
to shaded-dependencies/<connector name>
and add the new module into shaded-dependencies/pom.xml
.
Ensure that third-party dependencies are renamed as specified
in shaded-dependencies/<connector name>/pom.xml
and build (mvn clean install
).
- Copy
pulsar-connectors/template/
topulsar-connectors/<connector name>/
- Add the new module into
pulsar-connectors/pom.xml
- Update connector's name and description in
pulsar-connectors/<connector name>/src/main/resources/META-INF/services/pulsar-io.yaml
- Update the
pulsar-connectors/<connector name>/README.md
- Update the root
README.md
- Update
LICENSE
andNOTICE
files - Build (
mvn clean install
). - Run
mvn dependency:tree -Dverbose
to review how Maven auto-resolved potential dependency conflicts and fix as needed
To check the connector for CVEs:
mvn clean install verify -Powasp-dependency-check -DskipTests -f pulsar-connectors/<connector dir>/pom.xml
Detailed report will be at pulsar-connectors/<connector dir>/target/dependency-check-report.html