Skip to content

Commit

Permalink
Refactor to simplify configuration. Moved to CloudSolrSinkConnector i…
Browse files Browse the repository at this point in the history
…nstead of specifying which DocumentHandler to use.
  • Loading branch information
jcustenborder committed Mar 21, 2016
1 parent 58a6486 commit c842233
Show file tree
Hide file tree
Showing 22 changed files with 104 additions and 143 deletions.
60 changes: 26 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,62 +1,54 @@

Currently the CloudSolr configuration is the only thing that has been tested.


# Topic Configuration

| Name | Description | Type | Default | Importance |
|------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------|---------|----------------------------------------------------------------|------------|
| <solr config instance>.topic | Kafka topic to map to | string | | High |
| <solr config instance>.solr.input.document.converter.class | Factory class used to get the SolrInputDocumentConverter implementation. | class | io.confluent.connect.solr.sink.solr.SolrInputDocumentConverter | High |
| <solr config instance>.commit.within | Configures Solr UpdaterRequest for a commit within the requested number of milliseconds. | int | null | Low |
| <solr config instance>.column.ignore.unknown.fields | Flag to determine if the connector should raise an exception when it encountered a field it doesn't have configured. | boolean | false | Low |


# Field Configuration

Field configuration is driven by a mapping from the source Kafka Connect schema to the SOLR schema. In the table below <connect field name> is replaced with the field name of the connect schema.

| Name | Description | Type | Default | Importance |
|----------------------------------------------------------------------|---------------------------------------------------|--------|---------|------------|
| <solr config instance>.column.mappings.<connect field name>.field | Name of the field in the solr schema. | string | | High |
| <solr config instance>.column.mappings.<connect field name>.currency | Currency code to use when writing data for field. | string | null | Low |

# CloudSolr Configuration


# CloudSolr Configuration

CloudSolr can be configured by using `io.confluent.connect.solr.sink.solr.CloudSolrInputDocumentHandlerFactory`.
## Topic Configuration

```
solr.input.document.handler.class=io.confluent.connect.solr.sink.solr.CloudSolrInputDocumentHandlerFactory
```
Multiple topics can be managed by a single connector. This is configured per topic by prefixing a `<solr config instance>`. For example the configuration below uses `solr0` for the first topic, `solr1` for the second, etc.

The following configuration items are specific to the CloudSolr implementation.
| Name | Description | Type | Default | Importance |
|--------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------|---------|----------------------------------------------------------------|------------|
| `<solr config instance>`.topic | Kafka topic to map to | string | | High |
| `<solr config instance>`.collection.name | Solr collection to write the data to | string | | High |
| `<solr config instance>`.solr.input.document.converter.class | Factory class used to get the SolrInputDocumentConverter implementation. | class | io.confluent.connect.solr.sink.SolrInputDocumentConverter | High |
| `<solr config instance>`.commit.within | Configures Solr UpdaterRequest for a commit within the requested number of milliseconds. | int | null | Low |
| `<solr config instance>`.column.ignore.unknown.fields | Flag to determine if the connector should raise an exception when it encountered a field it doesn't have configured. | boolean | false | Low |

| Name | Description | Type | Default | Importance |
|-----------------------|------------------------------------------------------------|--------|---------|------------|
| solr.zookeeper.hosts | Zookeeper hosts that are used to store solr configuration. | string | | High |
| solr.zookeeper.chroot | Chroot within solr for the zookeeper configuration. | string | null | High |
## Field Configuration

Field configuration is driven by a mapping from the source Kafka Connect schema to the SOLR schema. In the table below `<connect field name>` is replaced with the field name of the connect schema.

| Name | Description | Type | Default | Importance |
|--------------------------------------------------------------------------|---------------------------------------------------|--------|---------|------------|
| `<solr config instance>`.column.mappings.`<connect field name>`.field | Name of the field in the solr schema. | string | | High |
| `<solr config instance>`.column.mappings.`<connect field name>`.currency | Currency code to use when writing data for field. | string | null | Low |

## Complete Example

The follow example we are pulling from the Kafka topic `twitter` and writing to the SOLR collection `twitter` using 2 tasks.

```
name=solrcloud
topics=twitter
tasks.max=2
connector.class=io.confluent.connect.solr.sink.SolrSinkConnector
solr.input.document.handler.class=io.confluent.connect.solr.sink.solr.CloudSolrInputDocumentHandlerFactory
connector.class=io.confluent.connect.solr.sink.CloudSolrSinkConnector
solr.zookeeper.hosts=192.168.99.100:2181
solr0.topic=twitter
solr0.collection.name=twitter
solr0.commit.within=1000
solr0.column.ignore.uknown.fields=true
solr0.column.ignore.unknown.fields=true
solr0.column.mappings.createdAt.field=created_date
solr0.column.mappings.favoriteCount.field=favorite_count
solr0.column.mappings.text.field=text
```

# HttpSolr







2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
<artifactId>kafka-connect-solr</artifactId>
<packaging>jar</packaging>
<version>1.0.0-SNAPSHOT</version>
<name>kafka-connect-jdbc</name>
<name>kafka-connect-solr</name>
<organization>
<name>Confluent, Inc.</name>
<url>http://confluent.io</url>
Expand Down
5 changes: 2 additions & 3 deletions solrcloud.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
name=solrcloud
topics=twitter
tasks.max=2
connector.class=io.confluent.connect.solr.sink.SolrSinkConnector
solr.input.document.handler.class=io.confluent.connect.solr.sink.solr.CloudSolrInputDocumentHandlerFactory
connector.class=io.confluent.connect.solr.sink.CloudSolrSinkConnector
solr.zookeeper.hosts=192.168.99.100:2181
solr0.topic=twitter
solr0.collection.name=twitter
solr0.commit.within=1000
solr0.column.ignore.uknown.fields=true
solr0.column.ignore.unknown.fields=true
solr0.column.mappings.createdAt.field=created_date
solr0.column.mappings.favoriteCount.field=favorite_count
solr0.column.mappings.text.field=text
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.connect.solr.sink.solr;
package io.confluent.connect.solr.sink;

import io.confluent.connect.solr.sink.config.CloudSolrSinkTopicConfig;
import org.apache.solr.client.solrj.SolrClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.connect.solr.sink.solr;
package io.confluent.connect.solr.sink;

import io.confluent.connect.solr.sink.config.CloudSolrInputDocumentHandlerConfig;
import io.confluent.connect.solr.sink.config.CloudSolrSinkConnectorConfig;
import io.confluent.connect.solr.sink.config.CloudSolrSinkTopicConfig;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.CloudSolrClient;

import java.util.List;
import java.util.Map;

public class CloudSolrInputDocumentHandlerFactory extends SolrInputDocumentHandlerFactory {
class CloudSolrInputDocumentHandlerFactory extends SolrInputDocumentHandlerFactory {

CloudSolrInputDocumentHandlerConfig cloudSolrSinkConnectorConfig;
CloudSolrSinkConnectorConfig cloudSolrSinkConnectorConfig;
SolrClient solrClient;

@Override
public void initialize(Map<String, String> props) {
this.cloudSolrSinkConnectorConfig = new CloudSolrInputDocumentHandlerConfig(props);
this.cloudSolrSinkConnectorConfig = new CloudSolrSinkConnectorConfig(props);

List<String> zookeeperHost = this.cloudSolrSinkConnectorConfig.getZookeeperHosts();
String chroot = this.cloudSolrSinkConnectorConfig.getZookeeperChroot();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.confluent.connect.solr.sink;

import org.apache.kafka.connect.connector.Task;

public class CloudSolrSinkConnector extends SolrSinkConnector {
@Override
public Class<? extends Task> taskClass() {
return CloudSolrSinkTask.class;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.confluent.connect.solr.sink;


import java.util.Map;

public class CloudSolrSinkTask extends SolrSinkTask {
@Override
protected SolrInputDocumentHandlerFactory getSolrInputDocumentHandlerFactory(Map<String, String> map) {
CloudSolrInputDocumentHandlerFactory cloudSolrInputDocumentHandlerFactory = new CloudSolrInputDocumentHandlerFactory();
cloudSolrInputDocumentHandlerFactory.initialize(map);
return cloudSolrInputDocumentHandlerFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.connect.solr.sink.solr;
package io.confluent.connect.solr.sink;

public class DefaultSolrInputDocumentConverter extends SolrInputDocumentConverter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.connect.solr.sink.solr;
package io.confluent.connect.solr.sink;

import io.confluent.connect.solr.sink.config.HttpSolrSinkTopicConfig;
import org.apache.solr.client.solrj.SolrClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.connect.solr.sink.solr;
package io.confluent.connect.solr.sink;

import java.util.Map;

public class HttpSolrInputDocumentHandlerFactory extends SolrInputDocumentHandlerFactory {
class HttpSolrInputDocumentHandlerFactory extends SolrInputDocumentHandlerFactory {

@Override
public void initialize(Map<String, String> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.connect.solr.sink.solr;
package io.confluent.connect.solr.sink;

import io.confluent.connect.solr.sink.config.FieldConfig;
import io.confluent.connect.solr.sink.config.SolrSinkTopicConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.connect.solr.sink.solr;
package io.confluent.connect.solr.sink;

import io.confluent.connect.solr.sink.config.SolrSinkTopicConfig;
import org.apache.kafka.connect.sink.SinkRecord;
Expand All @@ -27,7 +27,7 @@

import java.io.IOException;

public abstract class SolrInputDocumentHandler implements AutoCloseable {
abstract class SolrInputDocumentHandler implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(SolrInputDocumentHandler.class);

private final String topic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.confluent.connect.solr.sink.solr;
package io.confluent.connect.solr.sink;

import java.util.HashMap;
import java.util.Map;

public abstract class SolrInputDocumentHandlerFactory implements AutoCloseable {
abstract class SolrInputDocumentHandlerFactory implements AutoCloseable {
public abstract void initialize(Map<String, String> props);
protected abstract SolrInputDocumentHandler create(String topic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.Map;


public class SolrSinkConnector extends SinkConnector {
public abstract class SolrSinkConnector extends SinkConnector {

@Override
public String version() {
Expand All @@ -37,11 +37,6 @@ public void start(Map<String, String> map) {
this.config = map;
}

@Override
public Class<? extends Task> taskClass() {
return SolrSinkTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int count) {
List<Map<String, String>> results = new ArrayList<>();
Expand Down
11 changes: 4 additions & 7 deletions src/main/java/io/confluent/connect/solr/sink/SolrSinkTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
package io.confluent.connect.solr.sink;


import io.confluent.connect.solr.sink.config.SolrSinkTaskConfig;
import io.confluent.connect.solr.sink.solr.SolrInputDocumentHandler;
import io.confluent.connect.solr.sink.solr.SolrInputDocumentHandlerFactory;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -34,20 +31,20 @@
import java.util.Map;
import java.util.Set;

public class SolrSinkTask extends SinkTask {
public abstract class SolrSinkTask extends SinkTask {
private static final Logger log = LoggerFactory.getLogger(SolrSinkTask.class);
SolrInputDocumentHandlerFactory solrInputDocumentHandlerFactory;

protected abstract SolrInputDocumentHandlerFactory getSolrInputDocumentHandlerFactory(Map<String, String> map);

@Override
public String version() {
return VersionUtil.getVersion();
}

@Override
public void start(Map<String, String> map) {
SolrSinkTaskConfig config = new SolrSinkTaskConfig(map);
this.solrInputDocumentHandlerFactory = config.getSolrInputDocumentFactory();
this.solrInputDocumentHandlerFactory.initialize(map);
this.solrInputDocumentHandlerFactory = getSolrInputDocumentHandlerFactory(map);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.List;
import java.util.Map;

public class CloudSolrInputDocumentHandlerConfig extends SolrInputDocumentHandlerConfig<CloudSolrSinkTopicConfig> {
public class CloudSolrSinkConnectorConfig extends SolrSinkConnectorConfig<CloudSolrSinkTopicConfig> {

public static final String ZOOKEEPER_HOSTS_CONFIG = "solr.zookeeper.hosts";
private static final String ZOOKEEPER_HOSTS_DOC = "Zookeeper hosts that are used to store solr configuration.";
Expand All @@ -42,7 +42,7 @@ public String getZookeeperChroot(){
return this.getString(ZOOKEEPER_CHROOT_CONFIG);
}

protected CloudSolrInputDocumentHandlerConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
protected CloudSolrSinkConnectorConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
super(subclassConfigDef, props);
}

Expand All @@ -52,7 +52,7 @@ protected CloudSolrSinkTopicConfig createTopicConfig(Map<String, String> props)
}


public CloudSolrInputDocumentHandlerConfig(Map<String, String> props) {
public CloudSolrSinkConnectorConfig(Map<String, String> props) {
this(configDef(), props);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@

import java.util.Map;

public class HttpSolrInputDocumentHandlerConfig extends SolrInputDocumentHandlerConfig {
public class HttpSolrSinkConnectorConfig extends SolrSinkConnectorConfig {



protected HttpSolrInputDocumentHandlerConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
protected HttpSolrSinkConnectorConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
super(subclassConfigDef, props);
}

Expand All @@ -32,7 +32,7 @@ protected SolrSinkTopicConfig createTopicConfig(Map props) {
return null;
}

public HttpSolrInputDocumentHandlerConfig(Map<String, String> props) {
public HttpSolrSinkConnectorConfig(Map<String, String> props) {
this(config,props);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
*/
package io.confluent.connect.solr.sink.config;

import io.confluent.connect.solr.sink.solr.SolrInputDocumentHandlerFactory;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.ConnectException;

import java.util.*;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class SolrInputDocumentHandlerConfig<T extends SolrSinkTopicConfig> extends AbstractConfig {
public abstract class SolrSinkConnectorConfig<T extends SolrSinkTopicConfig> extends AbstractConfig {
static ConfigDef config = baseConfigDef();

public static ConfigDef baseConfigDef() {
Expand All @@ -34,11 +36,11 @@ public static ConfigDef baseConfigDef() {

final Map<String, T> topicConfigLookup;

public SolrInputDocumentHandlerConfig(Map<String, String> props) {
public SolrSinkConnectorConfig(Map<String, String> props) {
this(config, props);
}

protected SolrInputDocumentHandlerConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
protected SolrSinkConnectorConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
super(subclassConfigDef, props);
this.topicConfigLookup = loadSolr();
}
Expand Down
Loading

0 comments on commit c842233

Please sign in to comment.