This connector pushes messages from Apache Pulsar topic into Snowflake DB.
This connector is Open Source Software, Apache 2 licensed and built using Snowflake's Connector for Kafka (documentation).
Please refer to this blog post for a quick walk-through.
Please refer to Apache Pulsar IO documentation.
Use pulsar-snowflake-connector/target/pulsar-snowflake-connector-0.1.0-SNAPSHOT.nar
if the project is built from sources.
Parameter Name | Description | Default value |
---|---|---|
batchSize | Size of messages in bytes the sink will attempt to batch messages together before flush. | 16384 |
lingerTimeMs | Time interval in milliseconds the sink will attempt to batch messages together before flush. | 2147483647 |
topic | The topic name that passed to kafka sink. | n/a |
kafkaConnectorSinkClass | A kafka-connector sink class to use. | com.snowflake.kafka.connector.SnowflakeSinkConnector |
offsetStorageTopic | Pulsar topic to store offsets at. | snowflake-sink-offsets |
unwrapKeyValueIfAvailable | In case of Record<KeyValue<>> data use key from KeyValue<> instead of one from Record. |
true |
kafkaConnectorConfigProperties | Config properties to pass to the kafka connector. | n/a |
List of kafkaConnectorConfigProperties
can be found at the documentation for the Snowflake's Connector for Kafka.
snowflake.topic2table.map
parameter is not supported.
Snowflake's Connector expects topic:table[,topic:table]
format and does not handle Pulsar's topic URLs persistent://tenant/namespace/topic-name
.
processingGuarantees: "EFFECTIVELY_ONCE"
configs:
topic: "snowflake-demo"
offsetStorageTopic: "snowflake-sink-offsets-demo"
batchSize: "100"
lingerTimeMs: "600000"
kafkaConnectorConfigProperties:
name: "snowflakedemo"
connector.class: "com.snowflake.kafka.connector.SnowflakeSinkConnector"
tasks.max: "1"
topics: "snowflake-demo"
buffer.count.records: "100"
buffer.flush.time: "600"
buffer.size.bytes: "102400"
snowflake.url.name: "tenant.snowflakecomputing.com:443"
snowflake.user.name: "kafka_connector_user"
snowflake.private.key: "very_secret_key"
snowflake.database.name: "kafka_db"
snowflake.schema.name: "kafka_schema"
key.converter: "org.apache.kafka.connect.storage.StringConverter"
value.converter: "com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
If you want to develop and test this library you need to build it from sources.
mvn clean install -DskipTests
The Apache Pulsar distro is regulated by the properties pulsar.version
and pulsar.distribution-name
.
mvn release:prepare -Prelease -Dresume=false
The GitHub release is handled by a GitHub action whenever a tag is being pushed