Skip to content

Commit

Permalink
Added support for HttpSolr. Added more docs.
Browse files Browse the repository at this point in the history
  • Loading branch information
jcustenborder committed Mar 21, 2016
1 parent c842233 commit 505b053
Show file tree
Hide file tree
Showing 12 changed files with 175 additions and 18 deletions.
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@ Currently the CloudSolr configuration is the only thing that has been tested.

# CloudSolr Configuration

## Connector configuration

```
connector.class=io.confluent.connect.solr.sink.CloudSolrSinkConnector
```

| Name | Description | Type | Default | Importance |
|-----------------------|------------------------------------------------------------|--------|---------|------------|
| solr.zookeeper.hosts | Zookeeper hosts that are used to store solr configuration. | List | | High |
| solr.zookeeper.chroot | Chroot within solr for the zookeeper configuration. | String | null | High |


## Topic Configuration
Expand Down Expand Up @@ -46,7 +56,56 @@ solr0.column.mappings.text.field=text

# HttpSolr

## Connector configuration

```
connector.class=io.confluent.connect.solr.sink.HttpSolrSinkConnector
```

| Name | Description | Type | Default | Importance |
|-----------------------|------------------------------------------------------------|--------|---------|------------|
| solr.url | Base url to connect to solr with. | String | | High |


## Topic Configuration

Multiple topics can be managed by a single connector. This is configured per topic by prefixing a `<solr config instance>`. For example the configuration below uses `solr0` for the first topic, `solr1` for the second, etc.

| Name | Description | Type | Default | Importance |
|--------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------|---------|----------------------------------------------------------------|------------|
| `<solr config instance>`.topic | Kafka topic to map to | string | | High |
| `<solr config instance>`.core.name | Solr core to write the data to. | string | | High |
| `<solr config instance>`.solr.input.document.converter.class | Factory class used to get the SolrInputDocumentConverter implementation. | class | io.confluent.connect.solr.sink.SolrInputDocumentConverter | High |
| `<solr config instance>`.commit.within | Configures Solr UpdaterRequest for a commit within the requested number of milliseconds. | int | null | Low |
| `<solr config instance>`.column.ignore.unknown.fields | Flag to determine if the connector should raise an exception when it encountered a field it doesn't have configured. | boolean | false | Low |

## Field Configuration

Field configuration is driven by a mapping from the source Kafka Connect schema to the SOLR schema. In the table below `<connect field name>` is replaced with the field name of the connect schema.

| Name | Description | Type | Default | Importance |
|--------------------------------------------------------------------------|---------------------------------------------------|--------|---------|------------|
| `<solr config instance>`.column.mappings.`<connect field name>`.field | Name of the field in the solr schema. | string | | High |
| `<solr config instance>`.column.mappings.`<connect field name>`.currency | Currency code to use when writing data for field. | string | null | Low |

## Complete Example

The follow example we are pulling from the Kafka topic `twitter` and writing to the SOLR collection `twitter` using 2 tasks.

```
name=httpsolr
topics=twitter
tasks.max=2
connector.class=io.confluent.connect.solr.sink.HttpSolrSinkConnector
solr.url=http://192.168.99.100:8984/solr/
solr0.topic=twitter
solr0.core.name=twitter
solr0.commit.within=1000
solr0.column.ignore.unknown.fields=true
solr0.column.mappings.createdAt.field=created_date
solr0.column.mappings.favoriteCount.field=favorite_count
solr0.column.mappings.text.field=text
```



Expand Down
2 changes: 1 addition & 1 deletion solrcloud.properties → cloudsolr.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#

name=solrcloud
name=cloudsolr
topics=twitter
tasks.max=2
connector.class=io.confluent.connect.solr.sink.CloudSolrSinkConnector
Expand Down
7 changes: 6 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ zookeeper:
# environment:
# - "ZOOKEEPER_PORT_2181_TCP_ADDR=zookeeper"
# - "ZOOKEEPER_PORT_2181_TCP_PORT=2181"
solr:
solr_cloud:
image: solr
hostname: solr
command: "bash -c '/opt/solr/bin/solr start -f -z zookeeper:2181 -h solr'"
Expand All @@ -38,6 +38,11 @@ solr:
environment:
- "ZOOKEEPER_PORT_2181_TCP_ADDR=zookeeper"
- "ZOOKEEPER_PORT_2181_TCP_PORT=2181"
solr_standalone:
image: solr
hostname: solr
ports:
- "8984:8983"
#schema-registry:
# image: confluent/schema-registry
# entrypoint: "bash -c 'sleep 10;/usr/local/bin/schema-registry-docker.sh'"
Expand Down
29 changes: 29 additions & 0 deletions httpsolr.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

name=httpsolr
topics=twitter
tasks.max=2
connector.class=io.confluent.connect.solr.sink.HttpSolrSinkConnector
solr.url=http://192.168.99.100:8984/solr/
solr0.topic=twitter
solr0.core.name=twitter
solr0.commit.within=1000
solr0.column.ignore.unknown.fields=true
solr0.column.mappings.createdAt.field=created_date
solr0.column.mappings.favoriteCount.field=favorite_count
solr0.column.mappings.text.field=text
#We are skipping user
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void initialize(Map<String, String> props) {
@Override
protected SolrInputDocumentHandler create(String topic) {
CloudSolrSinkTopicConfig cloudSolrSinkTopicConfig = this.cloudSolrSinkConnectorConfig.getTopicConfig(topic);
SolrInputDocumentConverter solrInputDocumentConverter = cloudSolrSinkTopicConfig.getSolrInputDocumentFactory();
SolrInputDocumentConverter solrInputDocumentConverter = cloudSolrSinkTopicConfig.getSolrInputDocumentConverter();

return new CloudSolrInputDocumentHandler(cloudSolrSinkTopicConfig, solrClient, solrInputDocumentConverter, cloudSolrSinkTopicConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.request.UpdateRequest;

import java.io.File;

class HttpSolrInputDocumentHandler extends SolrInputDocumentHandler {
public HttpSolrInputDocumentHandler(HttpSolrSinkTopicConfig topicConfig, SolrClient solrClient, SolrInputDocumentConverter solrInputDocumentConverter) {
super(topicConfig, solrClient, solrInputDocumentConverter);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,47 @@
*/
package io.confluent.connect.solr.sink;

import io.confluent.connect.solr.sink.config.HttpSolrSinkConnectorConfig;
import io.confluent.connect.solr.sink.config.HttpSolrSinkTopicConfig;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;

class HttpSolrInputDocumentHandlerFactory extends SolrInputDocumentHandlerFactory {
private static final Logger log = LoggerFactory.getLogger(HttpSolrInputDocumentHandlerFactory.class);
HttpSolrSinkConnectorConfig httpSolrSinkConnectorConfig;

@Override
public void initialize(Map<String, String> props) {

this.httpSolrSinkConnectorConfig = new HttpSolrSinkConnectorConfig(props);
}

@Override
protected SolrInputDocumentHandler create(String topic) {
return null;
HttpSolrSinkTopicConfig topicConfig = this.httpSolrSinkConnectorConfig.getTopicConfig(topic);
SolrInputDocumentConverter solrInputDocumentConverter = topicConfig.getSolrInputDocumentConverter();
String coreName = topicConfig.getCoreName();
String coreUrl;
try {
URL baseUrl = new URL(this.httpSolrSinkConnectorConfig.getSolrUrl());
coreUrl = new URL(baseUrl,coreName).toString();
} catch (MalformedURLException e) {
throw new ConnectException("Invalid Solr Url", e);
}

if(log.isInfoEnabled()) {
log.info("Configuring topic '{}' to url '{}'", topicConfig.getCoreName(), coreUrl);
}

SolrClient solrClient = new HttpSolrClient(coreUrl);
return new HttpSolrInputDocumentHandler(topicConfig, solrClient, solrInputDocumentConverter);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package io.confluent.connect.solr.sink;

import org.apache.kafka.connect.connector.Task;

public class HttpSolrSinkConnector extends SolrSinkConnector {
@Override
public Class<? extends Task> taskClass() {
return HttpSolrSinkTask.class;
}
}
13 changes: 13 additions & 0 deletions src/main/java/io/confluent/connect/solr/sink/HttpSolrSinkTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.confluent.connect.solr.sink;


import java.util.Map;

public class HttpSolrSinkTask extends SolrSinkTask {
@Override
protected SolrInputDocumentHandlerFactory getSolrInputDocumentHandlerFactory(Map<String, String> map) {
HttpSolrInputDocumentHandlerFactory httpSolrInputDocumentHandlerFactory = new HttpSolrInputDocumentHandlerFactory();
httpSolrInputDocumentHandlerFactory.initialize(map);
return httpSolrInputDocumentHandlerFactory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,32 @@

import java.util.Map;

public class HttpSolrSinkConnectorConfig extends SolrSinkConnectorConfig {
public class HttpSolrSinkConnectorConfig extends SolrSinkConnectorConfig<HttpSolrSinkTopicConfig> {

public static final String SOLR_URL_CONFIG = "solr.url";
private static final String SOLR_URL_DOC = "Url to connect to solr with.";

static ConfigDef configDef(){
return baseConfigDef()
.define(SOLR_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SOLR_URL_DOC)
;
}


protected HttpSolrSinkConnectorConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
super(subclassConfigDef, props);
}

@Override
protected SolrSinkTopicConfig createTopicConfig(Map props) {
return null;
protected HttpSolrSinkTopicConfig createTopicConfig(Map props) {
return new HttpSolrSinkTopicConfig(props);
}

public HttpSolrSinkConnectorConfig(Map<String, String> props) {
this(config,props);
this(configDef(),props);
}


public String getSolrUrl() {
return this.getString(SOLR_URL_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@

public class HttpSolrSinkTopicConfig extends SolrSinkTopicConfig {

public static final String COLLECTION_NAME_CONFIG = "collection.name";
private static final String COLLECTION_NAME_DOC = "Name of the solr collection to write to.";
public static final String CORE_NAME_CONFIG = "core.name";
private static final String CORE_NAME_DOC = "Name of the solr collection to write to.";


static ConfigDef configDef(){
return baseConfigDef()
.define(COLLECTION_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, COLLECTION_NAME_DOC)
.define(CORE_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, CORE_NAME_DOC)
;
}

public String getCollectionName(){
return this.getString(COLLECTION_NAME_CONFIG);
public String getCoreName(){
return this.getString(CORE_NAME_CONFIG);
}

protected HttpSolrSinkTopicConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import java.util.regex.Pattern;

public abstract class SolrSinkTopicConfig extends AbstractConfig {
static ConfigDef config = baseConfigDef();

public static final String TOPIC_CONFIG = "topic";
private static final String TOPIC_DOC = "Kafka topic";

Expand Down Expand Up @@ -55,7 +53,7 @@ protected SolrSinkTopicConfig(ConfigDef subclassConfigDef, Map<String, String> p
this.fieldConfigs = loadFieldConfigs();
}

public SolrInputDocumentConverter getSolrInputDocumentFactory(){
public SolrInputDocumentConverter getSolrInputDocumentConverter(){
SolrInputDocumentConverter solrInputDocumentConverter = this.getConfiguredInstance(SOLR_INPUT_DOCUMENT_CONVERTER_CLASS_CONFIG, SolrInputDocumentConverter.class);
solrInputDocumentConverter.configure(this);
return solrInputDocumentConverter;
Expand Down

0 comments on commit 505b053

Please sign in to comment.