Skip to content

Commit

Permalink
Added configuration parameters for zookeeper connect timeout, zookeep… (
Browse files Browse the repository at this point in the history
#35)

* Added configuration parameters for zookeeper connect timeout, zookeeper client timeout, and zookeeper retry expiry. Fixes #33. Fixes #34.

* Updated README.md
  • Loading branch information
jcustenborder authored Dec 13, 2019
1 parent ab6714f commit 143bcd7
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 5 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

# Introduction
[Documentation](https://jcustenborder.github.io/kafka-connect-documentation/projects/kafka-connect-solr)

[Confluent Hub](https://www.confluent.io/hub/jcustenborder/kafka-connect-solr)

The SOLR connector is a high speed mechanism for writing data to [Apache Solr](http://lucene.apache.org/solr/).

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>com.github.jcustenborder.kafka.connect</groupId>
<artifactId>kafka-connect-parent</artifactId>
<version>2.1.1-cp1</version>
<version>2.2.1-cp1</version>
</parent>
<artifactId>kafka-connect-solr</artifactId>
<version>0.1-SNAPSHOT</version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,31 @@ class CloudSolrSinkConnectorConfig extends SolrSinkConnectorConfig {

public static final String ZOOKEEPER_HOSTS_CONFIG = "solr.zookeeper.hosts";
public static final String ZOOKEEPER_CHROOT_CONFIG = "solr.zookeeper.chroot";
public static final String COLLECTION_NAME_CONFIG = "solr.collection.name";
public static final String ZOOKEEPER_CONNECT_TIMEOUT_CONFIG = "solr.zookeeper.connect.timeout.ms";
public static final String ZOOKEEPER_CLIENT_TIMEOUT_CONFIG = "solr.zookeeper.client.timeout.ms";
public static final String ZOOKEEPER_RETRY_EXPIRY_TIME_CONFIG = "solr.zookeeper.retry.expiry.time.ms";


private static final String ZOOKEEPER_HOSTS_DOC = "Zookeeper hosts that are used to store solr configuration.";
private static final String ZOOKEEPER_CHROOT_DOC = "Chroot within solr for the zookeeper configuration.";
private static final String ZOOKEEPER_CONNECT_TIMEOUT_DOC = "Set the connect timeout to the zookeeper ensemble in ms.";
private static final String ZOOKEEPER_CLIENT_TIMEOUT_DOC = "Set the timeout to the zookeeper ensemble in ms.";
private static final String ZOOKEEPER_RETRY_EXPIRY_TIME_DOC = "This is the time to wait to refetch the " +
"state after getting the same state version from ZK in ms.";

public final List<String> zookeeperHosts;
public final String zookeeperChroot;
public final int zookeeperConnectTimeoutMs;
public final int zookeeperClientTimeoutMs;
public final int zookeeperRetryExpiryTimeMs;

protected CloudSolrSinkConnectorConfig(Map<String, String> props) {
super(config(), props);
this.zookeeperHosts = this.getList(ZOOKEEPER_HOSTS_CONFIG);
this.zookeeperChroot = this.getString(ZOOKEEPER_CHROOT_CONFIG);
this.zookeeperConnectTimeoutMs = getInt(ZOOKEEPER_CONNECT_TIMEOUT_CONFIG);
this.zookeeperClientTimeoutMs = getInt(ZOOKEEPER_CLIENT_TIMEOUT_CONFIG);
this.zookeeperRetryExpiryTimeMs = getInt(ZOOKEEPER_RETRY_EXPIRY_TIME_CONFIG);
}


Expand All @@ -54,6 +68,27 @@ public static ConfigDef config() {
.group(CONNECTION_GROUP)
.defaultValue(null)
.build()
).define(
ConfigKeyBuilder.of(ZOOKEEPER_CONNECT_TIMEOUT_CONFIG, ConfigDef.Type.INT)
.importance(ConfigDef.Importance.LOW)
.documentation(ZOOKEEPER_CONNECT_TIMEOUT_DOC)
.group(CONNECTION_GROUP)
.defaultValue(15000)
.build()
).define(
ConfigKeyBuilder.of(ZOOKEEPER_CLIENT_TIMEOUT_CONFIG, ConfigDef.Type.INT)
.importance(ConfigDef.Importance.LOW)
.documentation(ZOOKEEPER_CLIENT_TIMEOUT_DOC)
.group(CONNECTION_GROUP)
.defaultValue(45000)
.build()
).define(
ConfigKeyBuilder.of(ZOOKEEPER_RETRY_EXPIRY_TIME_CONFIG, ConfigDef.Type.INT)
.importance(ConfigDef.Importance.LOW)
.documentation(ZOOKEEPER_RETRY_EXPIRY_TIME_DOC)
.group(CONNECTION_GROUP)
.defaultValue(3000)
.build()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.github.jcustenborder.kafka.connect.solr;


import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;
Expand All @@ -26,6 +25,7 @@

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class CloudSolrSinkTask extends SolrSinkTask<CloudSolrSinkConnectorConfig> {
private static final Logger log = LoggerFactory.getLogger(CloudSolrSinkTask.class);
Expand All @@ -35,7 +35,7 @@ protected CloudSolrSinkConnectorConfig config(Map settings) {
return new CloudSolrSinkConnectorConfig(settings);
}

SolrClient client;
CloudSolrClient client;

@Override
public void start(Map<String, String> settings) {
Expand All @@ -44,6 +44,9 @@ public void start(Map<String, String> settings) {
builder.withZkHost(this.config.zookeeperHosts);
builder.withZkChroot(this.config.zookeeperChroot);
this.client = builder.build();
this.client.setZkConnectTimeout(this.config.zookeeperConnectTimeoutMs);
this.client.setZkClientTimeout(this.config.zookeeperClientTimeoutMs);
this.client.setRetryExpiryTime((int) TimeUnit.SECONDS.convert(this.config.zookeeperRetryExpiryTimeMs, TimeUnit.MILLISECONDS));
}

@Override
Expand Down

0 comments on commit 143bcd7

Please sign in to comment.