diff --git a/config/cloudsolr.properties b/config/cloudsolr.properties index 8731017..4457292 100644 --- a/config/cloudsolr.properties +++ b/config/cloudsolr.properties @@ -1,11 +1,11 @@ # -# Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) +# Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -17,7 +17,7 @@ name=cloudsolr topics=twitter tasks.max=2 -connector.class=io.confluent.connect.solr.sink.CloudSolrSinkConnector +connector.class=com.github.jcustenborder.kafka.connect.solr.CloudSolrSinkConnector solr.zookeeper.hosts=192.168.99.100:2181 solr0.topic=twitter solr0.collection.name=twitter diff --git a/config/httpsolr.properties b/config/httpsolr.properties index 0f38db7..9da2d1c 100644 --- a/config/httpsolr.properties +++ b/config/httpsolr.properties @@ -1,11 +1,11 @@ # -# Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) +# Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -17,7 +17,7 @@ name=httpsolr topics=twitter tasks.max=2 -connector.class=io.confluent.connect.solr.sink.HttpSolrSinkConnector +connector.class=com.github.jcustenborder.kafka.connect.solr.HttpSolrSinkConnector solr.url=http://192.168.99.100:8984/solr/ solr0.topic=twitter solr0.core.name=twitter diff --git a/docker-compose.yml b/docker-compose.yml index b018789..b86d0bd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,11 +1,11 @@ # -# Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) +# Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pom.xml b/pom.xml index a98f5b2..d8bd0c1 100644 --- a/pom.xml +++ b/pom.xml @@ -1,13 +1,13 @@ - + - - - 4.0.0 + + 4.0.0 3.3.0 - io.confluent - kafka-connect-solr - jar - 1.0.0-SNAPSHOT - kafka-connect-solr - - Confluent, Inc. - http://confluent.io - - http://confluent.io - - A Kafka Connect connector copying data from Kafka to Solr. - - 2016 - - - Apache License 2.0 - http://www.apache.org/licenses/LICENSE-2.0.html - repo - - - - - scm:git:git://github.com/confluentinc/kafka-connect-jdbc.git - scm:git:git@github.com:confluentinc/kafka-connect-jdbc.git - https://github.com/confluentinc/kafka-connect-jdbc - HEAD - - - - 3.0.0 - 0.10.0.0-cp1 - 4.12 - 3.0 - 1.6.2 - 6.2.0 - 2.4 - UTF-8 - http://packages.confluent.io/maven/ - - - - - confluent - Confluent - ${confluent.maven.repo} - - - - - - org.apache.kafka - connect-api - ${kafka.version} - - - org.apache.solr - solr-solrj - ${solr.version} - - - io.confluent - kafka-connect-avro-converter - ${confluent.version} - - - - - - junit - junit - ${junit.version} - test - - - org.easymock - easymock - ${easymock.version} - test - - - org.powermock - powermock-module-junit4 - ${powermock.version} - test - - - org.powermock - powermock-api-easymock - ${powermock.version} - test - - - commons-io - commons-io - ${commons-io.version} - test - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.5.1 - true - - 1.7 - 1.7 - - - - maven-assembly-plugin - 2.6 - - - src/assembly/package.xml - - - - - make-assembly - package - - single - - - - - - org.apache.maven.plugins - maven-surefire-plugin - 2.19.1 - - -Djava.awt.headless=true - - - - com.mycila - license-maven-plugin - 3.0 - -
com/mycila/maven/plugin/license/templates/APACHE-2.txt
- - Jeremy Custenborder - jcustenborder@gmail.com - - - **/README - src/test/resources/** - src/main/resources/** - src/assembly/** - -
- - - - check - - - -
-
- - - - src/main/resources - true - - -
- - - - standalone - - - - maven-assembly-plugin - - - src/assembly/standalone.xml - - - - - - - + + com.github.jcustenborder.kafka.connect + kafka-connect-parent + 0.10.1.0-cp1 + + + io.confluent + kafka-connect-solr + jar + 0.1-SNAPSHOT + kafka-connect-solr + + https://github.com/jcustenborder/kafka-connect-solr + + A Kafka Connect connector copying data from Kafka to Solr. + + + 2016 + + + + The Apache License, Version 2.0 + https://www.apache.org/licenses/LICENSE-2.0 + repo + + + + scm:git:https://github.com/jcustenborder/kafka-connect-solr.git + scm:git:git://git@github.com:jcustenborder/kafka-connect-solr.git + https://github.com/jcustenborder/kafka-connect-solr + + + https://github.com/jcustenborder/kafka-connect-solr/issues + github + + + + 6.3.0 + + + + + org.apache.solr + solr-solrj + ${solr.version} + + + io.confluent.kafka + connect-utils + [0.1.14,0.1.100) + +
diff --git a/src/assembly/package.xml b/src/assembly/package.xml deleted file mode 100644 index fc40880..0000000 --- a/src/assembly/package.xml +++ /dev/null @@ -1,41 +0,0 @@ - - - package - - dir - - false - - - ${project.basedir} - share/doc/${project.name}/ - - README* - LICENSE* - NOTICE* - licenses/ - - - - ${project.basedir}/config - etc/${project.name} - - * - - - - - - share/java/${project.name} - true - true - - io.confluent:common-* - org.apache.kafka:connect-api - - - - diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnector.java new file mode 100644 index 0000000..e938277 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnector.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.solr; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; + +public class CloudSolrSinkConnector extends SolrSinkConnector { + @Override + public Class taskClass() { + return CloudSolrSinkTask.class; + } + + @Override + public ConfigDef config() { + return CloudSolrSinkConnectorConfig.config(); + } + + +} diff --git a/src/main/java/io/confluent/connect/solr/sink/config/CloudSolrSinkConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnectorConfig.java similarity index 56% rename from src/main/java/io/confluent/connect/solr/sink/config/CloudSolrSinkConnectorConfig.java rename to src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnectorConfig.java index 02b93f8..829ebb3 100644 --- a/src/main/java/io/confluent/connect/solr/sink/config/CloudSolrSinkConnectorConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnectorConfig.java @@ -1,59 +1,49 @@ /** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package io.confluent.connect.solr.sink.config; +package com.github.jcustenborder.kafka.connect.solr; import org.apache.kafka.common.config.ConfigDef; -import java.util.List; import java.util.Map; -public class CloudSolrSinkConnectorConfig extends SolrSinkConnectorConfig { +public class CloudSolrSinkConnectorConfig extends SolrSinkConnectorConfig { public static final String ZOOKEEPER_HOSTS_CONFIG = "solr.zookeeper.hosts"; - private static final String ZOOKEEPER_HOSTS_DOC = "Zookeeper hosts that are used to store solr configuration."; public static final String ZOOKEEPER_CHROOT_CONFIG = "solr.zookeeper.chroot"; + private static final String ZOOKEEPER_HOSTS_DOC = "Zookeeper hosts that are used to store solr configuration."; private static final String ZOOKEEPER_CHROOT_DOC = "Chroot within solr for the zookeeper configuration."; - - static ConfigDef configDef(){ - return baseConfigDef() - .define(ZOOKEEPER_HOSTS_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, ZOOKEEPER_HOSTS_DOC) - .define(ZOOKEEPER_CHROOT_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, ZOOKEEPER_CHROOT_DOC) - ; - } - - public List getZookeeperHosts(){ - return this.getList(ZOOKEEPER_HOSTS_CONFIG); - } - - public String getZookeeperChroot(){ - return this.getString(ZOOKEEPER_CHROOT_CONFIG); - } - - protected CloudSolrSinkConnectorConfig(ConfigDef subclassConfigDef, Map props) { - super(subclassConfigDef, props); - } - - @Override - protected CloudSolrSinkTopicConfig createTopicConfig(Map props) { - return new CloudSolrSinkTopicConfig(props); + public static final String COLLECTION_NAME_CONFIG = "collection.name"; + private static final String COLLECTION_NAME_DOC = "Name of the solr collection to write to."; + + public final String zookeeperHosts; + public final String zookeeperChroot; + public final String collectionName; + + protected CloudSolrSinkConnectorConfig(Map props) { + super(config(), props); + this.zookeeperHosts = this.getString(ZOOKEEPER_HOSTS_CONFIG); + this.zookeeperChroot = this.getString(ZOOKEEPER_CHROOT_CONFIG); + this.collectionName = this.getString(COLLECTION_NAME_CONFIG); } - public CloudSolrSinkConnectorConfig(Map props) { - this(configDef(), props); + public static ConfigDef config() { + return SolrSinkConnectorConfig.config() + .define(ZOOKEEPER_HOSTS_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, ZOOKEEPER_HOSTS_DOC) + .define(ZOOKEEPER_CHROOT_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, ZOOKEEPER_CHROOT_DOC) + .define(COLLECTION_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, COLLECTION_NAME_DOC); } - } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkTask.java new file mode 100644 index 0000000..ff5c7ad --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkTask.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.solr; + + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; + +import java.util.Map; + +public class CloudSolrSinkTask extends SolrSinkTask { + + + @Override + protected CloudSolrSinkConnectorConfig config(Map settings) { + return new CloudSolrSinkConnectorConfig(settings); + } + + @Override + protected SolrClient client() { + CloudSolrClient.Builder builder = new CloudSolrClient.Builder(); + builder.withZkHost(this.config.zookeeperHosts); + builder.withZkChroot(this.config.zookeeperChroot); + return builder.build(); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkConnector.java new file mode 100644 index 0000000..1cc1646 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkConnector.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.solr; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; + +public class HttpSolrSinkConnector extends SolrSinkConnector { + @Override + public Class taskClass() { + return HttpSolrSinkTask.class; + } + + @Override + public ConfigDef config() { + return HttpSolrSinkConnectorConfig.config(); + } + + +} diff --git a/src/main/java/io/confluent/connect/solr/sink/config/HttpSolrSinkConnectorConfig.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkConnectorConfig.java similarity index 53% rename from src/main/java/io/confluent/connect/solr/sink/config/HttpSolrSinkConnectorConfig.java rename to src/main/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkConnectorConfig.java index 65c48d1..156b74d 100644 --- a/src/main/java/io/confluent/connect/solr/sink/config/HttpSolrSinkConnectorConfig.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkConnectorConfig.java @@ -1,50 +1,47 @@ /** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -package io.confluent.connect.solr.sink.config; +package com.github.jcustenborder.kafka.connect.solr; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.types.Password; import java.util.Map; -public class HttpSolrSinkConnectorConfig extends SolrSinkConnectorConfig { - +public class HttpSolrSinkConnectorConfig extends SolrSinkConnectorConfig { + public static final String CORE_NAME_CONFIG = "solr.core.name"; + private static final String CORE_NAME_DOC = "Name of the solr core to write to."; public static final String SOLR_URL_CONFIG = "solr.url"; private static final String SOLR_URL_DOC = "Url to connect to solr with."; - static ConfigDef configDef(){ - return baseConfigDef() - .define(SOLR_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SOLR_URL_DOC) - ; - } - protected HttpSolrSinkConnectorConfig(ConfigDef subclassConfigDef, Map props) { - super(subclassConfigDef, props); - } + public final String solrUrl; + public final String coreName; - @Override - protected HttpSolrSinkTopicConfig createTopicConfig(Map props) { - return new HttpSolrSinkTopicConfig(props); - } public HttpSolrSinkConnectorConfig(Map props) { - this(configDef(),props); + super(config(), props); + this.solrUrl = this.getString(SOLR_URL_CONFIG); + this.coreName = this.getString(CORE_NAME_CONFIG); + } - public String getSolrUrl() { - return this.getString(SOLR_URL_CONFIG); + public static ConfigDef config() { + return SolrSinkConnectorConfig.config() + .define(CORE_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, CORE_NAME_DOC) + .define(SOLR_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SOLR_URL_DOC); } } diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkTask.java new file mode 100644 index 0000000..a259105 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkTask.java @@ -0,0 +1,39 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.solr; + + +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +public class HttpSolrSinkTask extends SolrSinkTask { + private static final Logger log = LoggerFactory.getLogger(HttpSolrSinkTask.class); + + @Override + protected HttpSolrSinkConnectorConfig config(Map settings) { + return new HttpSolrSinkConnectorConfig(settings); + } + + @Override + protected SolrClient client() { + ConcurrentUpdateSolrClient.Builder builder = new ConcurrentUpdateSolrClient.Builder(this.config.solrUrl); + return builder.build(); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/solr/SolrInputDocumentBuilder.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/SolrInputDocumentBuilder.java new file mode 100644 index 0000000..78153dd --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/SolrInputDocumentBuilder.java @@ -0,0 +1,73 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.solr; + +import com.google.common.base.Preconditions; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.solr.common.SolrInputDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; + +class SolrInputDocumentBuilder { + private static final Logger log = LoggerFactory.getLogger(SolrInputDocumentBuilder.class); + + public static SolrInputDocument build(SinkRecord record) { + Preconditions.checkNotNull(record, "record cannot be null."); + SolrInputDocument document = new SolrInputDocument(); + + if (record.value() instanceof Map) { + if (log.isTraceEnabled()) { + log.trace("Processing {}:{}:{} as Map", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset() + ); + } + Map map = (Map) record.value(); + for (Object key : map.keySet()) { + Object value = map.get(key); + log.trace("Setting {} to {}.", key, value); + document.addField(key.toString(), value); + } + } else if (record.value() instanceof Struct) { + if (log.isTraceEnabled()) { + log.trace("Processing {}:{}:{} as Struct", + record.topic(), + record.kafkaPartition(), + record.kafkaOffset() + ); + } + Struct struct = (Struct) record.value(); + List fields = struct.schema().fields(); + for (Field field : fields) { + Object value = struct.get(field); + log.trace("Setting {} to {}.", field.name(), value); + document.addField(field.name(), value); + } + } else { + log.error("{}:{}:{} has an unsupported type for a value. Only Struct or Map are supported."); + throw new UnsupportedOperationException( + String.format("Unsupported value for type %s", record.value().getClass()) + ); + } + return document; + } +} diff --git a/src/main/java/io/confluent/connect/solr/sink/SolrSinkConnector.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/SolrSinkConnector.java similarity index 82% rename from src/main/java/io/confluent/connect/solr/sink/SolrSinkConnector.java rename to src/main/java/com/github/jcustenborder/kafka/connect/solr/SolrSinkConnector.java index bfbdc07..d5034b1 100644 --- a/src/main/java/io/confluent/connect/solr/sink/SolrSinkConnector.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/SolrSinkConnector.java @@ -1,11 +1,11 @@ /** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -13,9 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.confluent.connect.solr.sink; +package com.github.jcustenborder.kafka.connect.solr; -import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; import java.util.ArrayList; @@ -25,13 +24,13 @@ public abstract class SolrSinkConnector extends SinkConnector { + Map config; + @Override public String version() { return VersionUtil.getVersion(); } - Map config; - @Override public void start(Map map) { this.config = map; @@ -41,7 +40,7 @@ public void start(Map map) { public List> taskConfigs(int count) { List> results = new ArrayList<>(); - for(int i=0;i props) { + super(configDef, props); + this.commitWithin = this.getInt(SOLR_COMMIT_WITHIN_CONFIG); + this.ignoreUnknownFields = this.getBoolean(COLUMN_IGNORE_UNKNOWN_FIELDS_CONFIG); + this.username = this.getString(SOLR_USERNAME_CONFIG); + this.password = this.getPassword(SOLR_PASSWORD_CONFIG).value(); + + this.useBasicAuthentication = !Strings.isNullOrEmpty(this.username); + } + + public static ConfigDef config() { + return new ConfigDef() + .define(SOLR_COMMIT_WITHIN_CONFIG, ConfigDef.Type.INT, -1, ConfigDef.Importance.LOW, SOLR_COMMIT_WITHIN_DOC) + .define(COLUMN_IGNORE_UNKNOWN_FIELDS_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, COLUMN_IGNORE_UNKNOWN_FIELDS_DOC) + .define(SOLR_USERNAME_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, SOLR_USERNAME_DOC) + .define(SOLR_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD, "", ConfigDef.Importance.HIGH, SOLR_PASSWORD_DOC); + } +} diff --git a/src/main/java/com/github/jcustenborder/kafka/connect/solr/SolrSinkTask.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/SolrSinkTask.java new file mode 100644 index 0000000..5fb6885 --- /dev/null +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/SolrSinkTask.java @@ -0,0 +1,96 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.solr; + + +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.response.UpdateResponse; +import org.apache.solr.common.SolrInputDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +public abstract class SolrSinkTask extends SinkTask { + private static final Logger log = LoggerFactory.getLogger(SolrSinkTask.class); + protected T config; + + protected abstract T config(Map settings); + + protected abstract SolrClient client(); + + protected SolrClient client; + + @Override + public void start(Map settings) { + log.info("Starting"); + this.config = config(settings); + log.info("Creating Solr client."); + this.client = client(); + log.info("Created Solr client {}.", this.client); + } + + @Override + public void put(Collection collection) { + UpdateRequest updateRequest = new UpdateRequest(); + + if (this.config.useBasicAuthentication) { + log.trace("Configuring UpdateRequest to use basic authentication. Username = '{}'", this.config.username); + updateRequest.setBasicAuthCredentials(this.config.username, this.config.password); + } + + int count = 0; + + for (SinkRecord record : collection) { + SolrInputDocument solrInputDocument = SolrInputDocumentBuilder.build(record); + updateRequest.add(solrInputDocument); + count++; + } + + try { + log.trace("Sending {} documents to solr.", count); + UpdateResponse response = updateRequest.process(this.client); + if (null != response && log.isTraceEnabled()) { + log.trace("ElapsedTime = {} QTime = {}", response.getElapsedTime(), response.getQTime()); + } + } catch (SolrServerException e) { + throw new RetriableException("Exception thrown while processing request", e); + } catch (IOException e) { + throw new RetriableException("Exception thrown while processing request", e); + } + } + + @Override + public void stop() { + try { + this.client.close(); + } catch (IOException e) { + log.error("Exception thrown while closing client.", e); + } + } + + @Override + public String version() { + return VersionUtil.getVersion(); + } +} diff --git a/src/main/java/io/confluent/connect/solr/sink/VersionUtil.java b/src/main/java/com/github/jcustenborder/kafka/connect/solr/VersionUtil.java similarity index 70% rename from src/main/java/io/confluent/connect/solr/sink/VersionUtil.java rename to src/main/java/com/github/jcustenborder/kafka/connect/solr/VersionUtil.java index f72b0b0..e34400e 100644 --- a/src/main/java/io/confluent/connect/solr/sink/VersionUtil.java +++ b/src/main/java/com/github/jcustenborder/kafka/connect/solr/VersionUtil.java @@ -1,11 +1,11 @@ /** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -13,12 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.confluent.connect.solr.sink; +package com.github.jcustenborder.kafka.connect.solr; class VersionUtil { - private VersionUtil(){} + private VersionUtil() { + } - public static String getVersion(){ + public static String getVersion() { return "0.0.0.0"; } diff --git a/src/main/java/io/confluent/connect/solr/sink/CloudSolrInputDocumentHandler.java b/src/main/java/io/confluent/connect/solr/sink/CloudSolrInputDocumentHandler.java deleted file mode 100644 index aec0925..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/CloudSolrInputDocumentHandler.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink; - -import io.confluent.connect.solr.sink.config.CloudSolrSinkTopicConfig; -import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.request.UpdateRequest; - -class CloudSolrInputDocumentHandler extends SolrInputDocumentHandler { - final CloudSolrSinkTopicConfig cloudSolrSinkTopicConfig; - final String collection; - - public String getCollection() { - return collection; - } - - public CloudSolrInputDocumentHandler(CloudSolrSinkTopicConfig topicConfig, SolrClient solrClient, SolrInputDocumentConverter solrInputDocumentConverter, CloudSolrSinkTopicConfig cloudSolrSinkTopicConfig) { - super(topicConfig, solrClient, solrInputDocumentConverter); - this.cloudSolrSinkTopicConfig = cloudSolrSinkTopicConfig; - this.collection = this.cloudSolrSinkTopicConfig.getCollectionName(); - } - - @Override - protected void beforeFlush(UpdateRequest updateRequest) { - updateRequest.setParam("collection", this.collection); - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/CloudSolrInputDocumentHandlerFactory.java b/src/main/java/io/confluent/connect/solr/sink/CloudSolrInputDocumentHandlerFactory.java deleted file mode 100644 index 5107cf1..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/CloudSolrInputDocumentHandlerFactory.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink; - -import io.confluent.connect.solr.sink.config.CloudSolrSinkConnectorConfig; -import io.confluent.connect.solr.sink.config.CloudSolrSinkTopicConfig; -import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.impl.CloudSolrClient; - -import java.util.List; -import java.util.Map; - -class CloudSolrInputDocumentHandlerFactory extends SolrInputDocumentHandlerFactory { - - CloudSolrSinkConnectorConfig cloudSolrSinkConnectorConfig; - SolrClient solrClient; - - @Override - public void initialize(Map props) { - this.cloudSolrSinkConnectorConfig = new CloudSolrSinkConnectorConfig(props); - - List zookeeperHost = this.cloudSolrSinkConnectorConfig.getZookeeperHosts(); - String chroot = this.cloudSolrSinkConnectorConfig.getZookeeperChroot(); - this.solrClient = new CloudSolrClient(zookeeperHost, chroot); - } - - @Override - protected SolrInputDocumentHandler create(String topic) { - CloudSolrSinkTopicConfig cloudSolrSinkTopicConfig = this.cloudSolrSinkConnectorConfig.getTopicConfig(topic); - SolrInputDocumentConverter solrInputDocumentConverter = cloudSolrSinkTopicConfig.getSolrInputDocumentConverter(); - - return new CloudSolrInputDocumentHandler(cloudSolrSinkTopicConfig, solrClient, solrInputDocumentConverter, cloudSolrSinkTopicConfig); - } - - @Override - public void close() throws Exception { - this.solrClient.close(); - } -} \ No newline at end of file diff --git a/src/main/java/io/confluent/connect/solr/sink/CloudSolrSinkConnector.java b/src/main/java/io/confluent/connect/solr/sink/CloudSolrSinkConnector.java deleted file mode 100644 index 720ea95..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/CloudSolrSinkConnector.java +++ /dev/null @@ -1,24 +0,0 @@ -package io.confluent.connect.solr.sink; - -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.connector.Task; - -public class CloudSolrSinkConnector extends SolrSinkConnector { - @Override - public Class taskClass() { - return CloudSolrSinkTask.class; - } - - @Override - public ConfigDef config() { - - ConfigDef def = new ConfigDef(); - -// def. - - - - - return null; - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/CloudSolrSinkTask.java b/src/main/java/io/confluent/connect/solr/sink/CloudSolrSinkTask.java deleted file mode 100644 index fb3db4d..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/CloudSolrSinkTask.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.confluent.connect.solr.sink; - - -import java.util.Map; - -public class CloudSolrSinkTask extends SolrSinkTask { - @Override - protected SolrInputDocumentHandlerFactory getSolrInputDocumentHandlerFactory(Map map) { - CloudSolrInputDocumentHandlerFactory cloudSolrInputDocumentHandlerFactory = new CloudSolrInputDocumentHandlerFactory(); - cloudSolrInputDocumentHandlerFactory.initialize(map); - return cloudSolrInputDocumentHandlerFactory; - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/DefaultSolrInputDocumentConverter.java b/src/main/java/io/confluent/connect/solr/sink/DefaultSolrInputDocumentConverter.java deleted file mode 100644 index 2395c7e..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/DefaultSolrInputDocumentConverter.java +++ /dev/null @@ -1,20 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink; - -public class DefaultSolrInputDocumentConverter extends SolrInputDocumentConverter { - -} diff --git a/src/main/java/io/confluent/connect/solr/sink/HttpSolrInputDocumentHandler.java b/src/main/java/io/confluent/connect/solr/sink/HttpSolrInputDocumentHandler.java deleted file mode 100644 index deec8b5..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/HttpSolrInputDocumentHandler.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink; - -import io.confluent.connect.solr.sink.config.HttpSolrSinkTopicConfig; -import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.request.UpdateRequest; - -import java.io.File; - -class HttpSolrInputDocumentHandler extends SolrInputDocumentHandler { - public HttpSolrInputDocumentHandler(HttpSolrSinkTopicConfig topicConfig, SolrClient solrClient, SolrInputDocumentConverter solrInputDocumentConverter) { - super(topicConfig, solrClient, solrInputDocumentConverter); - - } - - @Override - protected void beforeFlush(UpdateRequest inputDocuments) { - - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/HttpSolrInputDocumentHandlerFactory.java b/src/main/java/io/confluent/connect/solr/sink/HttpSolrInputDocumentHandlerFactory.java deleted file mode 100644 index 912dbe0..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/HttpSolrInputDocumentHandlerFactory.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink; - -import io.confluent.connect.solr.sink.config.HttpSolrSinkConnectorConfig; -import io.confluent.connect.solr.sink.config.HttpSolrSinkTopicConfig; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.impl.ConcurrentUpdateSolrClient; -import org.apache.solr.client.solrj.impl.HttpSolrClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Map; - -class HttpSolrInputDocumentHandlerFactory extends SolrInputDocumentHandlerFactory { - private static final Logger log = LoggerFactory.getLogger(HttpSolrInputDocumentHandlerFactory.class); - HttpSolrSinkConnectorConfig httpSolrSinkConnectorConfig; - - @Override - public void initialize(Map props) { - this.httpSolrSinkConnectorConfig = new HttpSolrSinkConnectorConfig(props); - } - - @Override - protected SolrInputDocumentHandler create(String topic) { - HttpSolrSinkTopicConfig topicConfig = this.httpSolrSinkConnectorConfig.getTopicConfig(topic); - SolrInputDocumentConverter solrInputDocumentConverter = topicConfig.getSolrInputDocumentConverter(); - String coreName = topicConfig.getCoreName(); - String coreUrl; - try { - URL baseUrl = new URL(this.httpSolrSinkConnectorConfig.getSolrUrl()); - coreUrl = new URL(baseUrl,coreName).toString(); - } catch (MalformedURLException e) { - throw new ConnectException("Invalid Solr Url", e); - } - - if(log.isInfoEnabled()) { - log.info("Configuring topic '{}' to url '{}'", topicConfig.getCoreName(), coreUrl); - } - - SolrClient solrClient = new HttpSolrClient(coreUrl); - return new HttpSolrInputDocumentHandler(topicConfig, solrClient, solrInputDocumentConverter); - } - - @Override - public void close() throws Exception { - - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/HttpSolrSinkConnector.java b/src/main/java/io/confluent/connect/solr/sink/HttpSolrSinkConnector.java deleted file mode 100644 index 980263d..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/HttpSolrSinkConnector.java +++ /dev/null @@ -1,16 +0,0 @@ -package io.confluent.connect.solr.sink; - -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.connector.Task; - -public class HttpSolrSinkConnector extends SolrSinkConnector { - @Override - public Class taskClass() { - return HttpSolrSinkTask.class; - } - - @Override - public ConfigDef config() { - return null; - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/HttpSolrSinkTask.java b/src/main/java/io/confluent/connect/solr/sink/HttpSolrSinkTask.java deleted file mode 100644 index 0cdce68..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/HttpSolrSinkTask.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.confluent.connect.solr.sink; - - -import java.util.Map; - -public class HttpSolrSinkTask extends SolrSinkTask { - @Override - protected SolrInputDocumentHandlerFactory getSolrInputDocumentHandlerFactory(Map map) { - HttpSolrInputDocumentHandlerFactory httpSolrInputDocumentHandlerFactory = new HttpSolrInputDocumentHandlerFactory(); - httpSolrInputDocumentHandlerFactory.initialize(map); - return httpSolrInputDocumentHandlerFactory; - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/SolrInputDocumentConverter.java b/src/main/java/io/confluent/connect/solr/sink/SolrInputDocumentConverter.java deleted file mode 100644 index e6e0354..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/SolrInputDocumentConverter.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink; - -import io.confluent.connect.solr.sink.config.FieldConfig; -import io.confluent.connect.solr.sink.config.SolrSinkTopicConfig; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.sink.SinkRecord; -import org.apache.solr.common.SolrInputDocument; - -import java.util.*; - -public abstract class SolrInputDocumentConverter { - Map connectFieldToSolrFieldMapping; - Map currencyFields; - - boolean ignoreUnknownFields; - - - public void configure(SolrSinkTopicConfig topicConfig){ - this.ignoreUnknownFields = topicConfig.ignoreUnknownFields(); - List fieldConfigs = topicConfig.getFieldConfigs(); - - Map connectFieldToSolrFieldMapping = new HashMap<>(); - Map currencyFields = new HashMap<>(); - - for(FieldConfig fieldConfig:fieldConfigs){ - connectFieldToSolrFieldMapping.put(fieldConfig.getStructField(), fieldConfig.getSolrFieldName()); - - if(null!=fieldConfig.getSolrFieldCurrency()){ - currencyFields.put(fieldConfig.getStructField(), fieldConfig.getSolrFieldCurrency()); - } - } - - this.connectFieldToSolrFieldMapping = connectFieldToSolrFieldMapping; - this.currencyFields = currencyFields; - } - - void convertField(final Struct row, Field field, SolrInputDocument solrInputDocument){ - final Object value = row.get(field); - - if(null==value && field.schema().isOptional()){ - return; - } - - String solrField = connectFieldToSolrFieldMapping.get(field.name()); - - if(null==solrField){ - solrField=field.name(); - } - - if(currencyFields.containsKey(field.name())){ - String currencyCode = currencyFields.get(field.name()); - String currency = String.format("%s,%s", value, currencyCode); - solrInputDocument.addField(solrField, currency); - } else { - switch(field.schema().type()){ - default: - solrInputDocument.addField(solrField, value); - break; - } - } - } - - public SolrInputDocument convert(SinkRecord sinkRecord){ - if(null==this.connectFieldToSolrFieldMapping||null==this.currencyFields){ - throw new ConnectException("configure() must be called before convert()."); - } - - if(null==sinkRecord) throw new NullPointerException("sinkRecord should not be null."); - if(!(sinkRecord.value() instanceof Struct)) throw new IllegalStateException("sinkRecord.value() should be struct."); - Struct valueStruct = (Struct)sinkRecord.value(); - SolrInputDocument solrInputDocument = new SolrInputDocument(); - - for(Field field:sinkRecord.valueSchema().fields()){ - convertField(valueStruct, field, solrInputDocument); - } - - return solrInputDocument; - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/SolrInputDocumentHandler.java b/src/main/java/io/confluent/connect/solr/sink/SolrInputDocumentHandler.java deleted file mode 100644 index bc15569..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/SolrInputDocumentHandler.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink; - -import io.confluent.connect.solr.sink.config.SolrSinkTopicConfig; -import org.apache.kafka.connect.sink.SinkRecord; -import org.apache.solr.client.solrj.SolrClient; -import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.request.UpdateRequest; -import org.apache.solr.client.solrj.response.UpdateResponse; -import org.apache.solr.common.SolrInputDocument; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -abstract class SolrInputDocumentHandler implements AutoCloseable { - private static final Logger log = LoggerFactory.getLogger(SolrInputDocumentHandler.class); - - private final String topic; - private final UpdateRequest updateRequest; - private final SolrInputDocumentConverter solrInputDocumentConverter; - - protected final SolrClient solrClient; - - public SolrInputDocumentHandler(SolrSinkTopicConfig topicConfig, SolrClient solrClient, SolrInputDocumentConverter solrInputDocumentConverter) { - this.topic = topicConfig.getTopic(); - this.solrClient = solrClient; - this.solrInputDocumentConverter = solrInputDocumentConverter; - this.updateRequest = new UpdateRequest(); - - if(null!=topicConfig.getCommitWithin()){ - if(log.isInfoEnabled()){ - log.info("Setting CommitWithin for UpdateRequest to {} ms for {}.", topicConfig.getCommitWithin(), this.topic); - } - this.updateRequest.setCommitWithin(topicConfig.getCommitWithin()); - } - } - - public String topic() { - return this.topic; - } - - protected abstract void beforeFlush(UpdateRequest inputDocuments); - - public void addRecord(SinkRecord record){ - SolrInputDocument solrInputDocument = this.solrInputDocumentConverter.convert(record); - this.updateRequest.add(solrInputDocument); - } - - @Override - public void close() throws Exception { - this.solrClient.close(); - } - - public void flush() throws IOException, SolrServerException { - int documentsToFlush = this.updateRequest.getDocuments().size(); - - if(log.isDebugEnabled()){ - log.debug("Writing {} document(s) for topic '{}' to solr.", documentsToFlush, this.topic); - } - - beforeFlush(this.updateRequest); - UpdateResponse updateResponse = updateRequest.process(this.solrClient); - - if(log.isInfoEnabled()){ - log.info("Wrote {} documents(s) for topic '{}' to solr in {} ms", documentsToFlush, this.topic, updateResponse.getElapsedTime()); - } - - this.updateRequest.clear(); - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/SolrInputDocumentHandlerFactory.java b/src/main/java/io/confluent/connect/solr/sink/SolrInputDocumentHandlerFactory.java deleted file mode 100644 index 136428b..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/SolrInputDocumentHandlerFactory.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink; - -import java.util.HashMap; -import java.util.Map; - -abstract class SolrInputDocumentHandlerFactory implements AutoCloseable { - public abstract void initialize(Map props); - protected abstract SolrInputDocumentHandler create(String topic); - - - final Map topicToSolrInputDocumentHandler; - - public SolrInputDocumentHandlerFactory(){ - this.topicToSolrInputDocumentHandler = new HashMap<>(); - } - - public final SolrInputDocumentHandler get(String topic) { - SolrInputDocumentHandler solrInputDocumentHandler = this.topicToSolrInputDocumentHandler.get(topic); - - if(null==solrInputDocumentHandler){ - solrInputDocumentHandler = create(topic); - this.topicToSolrInputDocumentHandler.put(topic, solrInputDocumentHandler); - } - - return solrInputDocumentHandler; - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/SolrSinkTask.java b/src/main/java/io/confluent/connect/solr/sink/SolrSinkTask.java deleted file mode 100644 index e1d975b..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/SolrSinkTask.java +++ /dev/null @@ -1,90 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink; - - -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.sink.SinkRecord; -import org.apache.kafka.connect.sink.SinkTask; -import org.apache.solr.client.solrj.SolrServerException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Collection; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -public abstract class SolrSinkTask extends SinkTask { - private static final Logger log = LoggerFactory.getLogger(SolrSinkTask.class); - SolrInputDocumentHandlerFactory solrInputDocumentHandlerFactory; - - protected abstract SolrInputDocumentHandlerFactory getSolrInputDocumentHandlerFactory(Map map); - - @Override - public String version() { - return VersionUtil.getVersion(); - } - - @Override - public void start(Map map) { - this.solrInputDocumentHandlerFactory = getSolrInputDocumentHandlerFactory(map); - } - - @Override - public void put(Collection collection) { - Set solrInputDocumentHandlers = new HashSet<>(); - - for(SinkRecord sinkRecord:collection){ - SolrInputDocumentHandler solrInputDocumentHandler = this.solrInputDocumentHandlerFactory.get(sinkRecord.topic()); - solrInputDocumentHandlers.add(solrInputDocumentHandler); - solrInputDocumentHandler.addRecord(sinkRecord); - } - - for(SolrInputDocumentHandler solrInputDocumentHandler:solrInputDocumentHandlers){ - if(log.isDebugEnabled()){ - log.debug("flushing documents for {}", solrInputDocumentHandler.topic()); - } - try { - solrInputDocumentHandler.flush(); - } catch(IOException|SolrServerException ex){ - throw new ConnectException( - String.format("Exception thrown while calling write to solr for topic '%s'", solrInputDocumentHandler.topic()), - ex - ); - } - } - } - - @Override - public void flush(Map map) { - - } - - @Override - public void stop() { - try { - this.solrInputDocumentHandlerFactory.close(); - } catch (Exception ex){ - if(log.isErrorEnabled()){ - log.error("Exception thrown while calling solrInputDocumentHandlerFactory.close();", ex); - } - } - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/config/CloudSolrSinkTopicConfig.java b/src/main/java/io/confluent/connect/solr/sink/config/CloudSolrSinkTopicConfig.java deleted file mode 100644 index cb6f580..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/config/CloudSolrSinkTopicConfig.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink.config; - -import org.apache.kafka.common.config.ConfigDef; - -import java.util.Map; - -public class CloudSolrSinkTopicConfig extends SolrSinkTopicConfig { - - public static final String COLLECTION_NAME_CONFIG = "collection.name"; - private static final String COLLECTION_NAME_DOC = "Name of the solr collection to write to."; - - static ConfigDef configDef(){ - return baseConfigDef() - .define(COLLECTION_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, COLLECTION_NAME_DOC) - ; - } - - public String getCollectionName(){ - return this.getString(COLLECTION_NAME_CONFIG); - } - - protected CloudSolrSinkTopicConfig(ConfigDef subclassConfigDef, Map props) { - super(subclassConfigDef, props); - } - - public CloudSolrSinkTopicConfig(Map props) { - this(configDef(), props); - } - -} diff --git a/src/main/java/io/confluent/connect/solr/sink/config/FieldConfig.java b/src/main/java/io/confluent/connect/solr/sink/config/FieldConfig.java deleted file mode 100644 index daf6f00..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/config/FieldConfig.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink.config; - - -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; - -import java.util.Map; - -public class FieldConfig extends AbstractConfig { - static ConfigDef config = baseConfigDef(); - - public static final String SOLR_FIELD_CONFIG = "field"; - private static final String SOLR_FIELD_DOC = "Name of the field in the solr schema."; - - public static final String SOLR_FIELD_CURRENCY_CONFIG = "currency"; - private static final String SOLR_FIELD_CURRENCY_DOC = "Currency code to use when writing data for field."; - - public static ConfigDef baseConfigDef() { - return new ConfigDef() - .define(SOLR_FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SOLR_FIELD_DOC) - .define(SOLR_FIELD_CURRENCY_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SOLR_FIELD_CURRENCY_DOC) - ; - } - - final String structField; - - public FieldConfig(String structField, Map props) { - super(config, props, false); - this.structField = structField; - } - - /** - * Name of the field of the incoming struct. - * @return - */ - public String getStructField() { - return structField; - } - - /** - * Name of the field in the solr schema. - * @return Name of the field in the solr schema. - */ - public String getSolrFieldName() { - return this.getString(SOLR_FIELD_CONFIG); - } - - /** - * Currency code for the field. - * @return - */ - public String getSolrFieldCurrency() { - return this.getString(SOLR_FIELD_CURRENCY_CONFIG); - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/config/HttpSolrSinkTopicConfig.java b/src/main/java/io/confluent/connect/solr/sink/config/HttpSolrSinkTopicConfig.java deleted file mode 100644 index 1fdfa41..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/config/HttpSolrSinkTopicConfig.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink.config; - -import org.apache.kafka.common.config.ConfigDef; - -import java.util.Map; - -public class HttpSolrSinkTopicConfig extends SolrSinkTopicConfig { - - public static final String CORE_NAME_CONFIG = "core.name"; - private static final String CORE_NAME_DOC = "Name of the solr collection to write to."; - - - static ConfigDef configDef(){ - return baseConfigDef() - .define(CORE_NAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, CORE_NAME_DOC) - ; - } - - public String getCoreName(){ - return this.getString(CORE_NAME_CONFIG); - } - - protected HttpSolrSinkTopicConfig(ConfigDef subclassConfigDef, Map props) { - super(subclassConfigDef, props); - } - - public HttpSolrSinkTopicConfig(Map props) { - this(configDef(), props); - } - -} diff --git a/src/main/java/io/confluent/connect/solr/sink/config/SolrSinkConnectorConfig.java b/src/main/java/io/confluent/connect/solr/sink/config/SolrSinkConnectorConfig.java deleted file mode 100644 index 6e65b59..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/config/SolrSinkConnectorConfig.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink.config; - -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.connect.errors.ConnectException; - -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public abstract class SolrSinkConnectorConfig extends AbstractConfig { - static ConfigDef config = baseConfigDef(); - - public static ConfigDef baseConfigDef() { - return new ConfigDef() - ; - } - - final Map topicConfigLookup; - - public SolrSinkConnectorConfig(Map props) { - this(config, props); - } - - protected SolrSinkConnectorConfig(ConfigDef subclassConfigDef, Map props) { - super(subclassConfigDef, props); - this.topicConfigLookup = loadSolr(); - } - - /** - * Method is used to create a SolrSinkTopicConfig specific to the SolrInputDocumentHandler. - * @param props - * @return - */ - protected T createTopicConfig(Map props) { - throw new UnsupportedOperationException(); - } - - /** - * Method is used to cycle through all of the potential topic specific configurations - * and create a lookup table by the topic. - * @return - */ - private Map loadSolr(){ - Pattern pattern = Pattern.compile("^solr\\d+\\."); - - Set prefixes = new LinkedHashSet<>(); - - Map input = originalsStrings(); - - for(Map.Entry kvp:input.entrySet()) { - Matcher matcher = pattern.matcher(kvp.getKey()); - - if(!matcher.find()){ - continue; - } - - String prefix = matcher.group(0); - prefixes.add(prefix); - } - - Map topicToConfig = new LinkedHashMap<>(); - - for(String prefix:prefixes){ - Map prefixedOriginals = this.originalsWithPrefix(prefix); - Map stringOriginals = new LinkedHashMap<>(); - for(Map.Entry kvp:prefixedOriginals.entrySet()){ - stringOriginals.put(kvp.getKey(), kvp.getValue().toString()); - } - T topicConfig = createTopicConfig(stringOriginals); - topicToConfig.put(topicConfig.getTopic(), topicConfig); - } - return topicToConfig; - } - - /** - * Method is used to return the config object for the requested topic. - * @param topic - * @return - * @exception ConnectException if the topic is not configured. - */ - public T getTopicConfig(String topic) { - T config = this.topicConfigLookup.get(topic); - if(null==config){ - throw new ConnectException( - String.format( - "Could not find configuration for topic '%s'", - topic - ) - ); - } - return config; - } -} diff --git a/src/main/java/io/confluent/connect/solr/sink/config/SolrSinkTopicConfig.java b/src/main/java/io/confluent/connect/solr/sink/config/SolrSinkTopicConfig.java deleted file mode 100644 index 0a84135..0000000 --- a/src/main/java/io/confluent/connect/solr/sink/config/SolrSinkTopicConfig.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink.config; - -import io.confluent.connect.solr.sink.DefaultSolrInputDocumentConverter; -import io.confluent.connect.solr.sink.SolrInputDocumentConverter; -import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ConfigDef; - -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -public abstract class SolrSinkTopicConfig extends AbstractConfig { - public static final String TOPIC_CONFIG = "topic"; - private static final String TOPIC_DOC = "Kafka topic"; - - public static final String SOLR_INPUT_DOCUMENT_CONVERTER_CLASS_CONFIG = "solr.input.document.converter.class"; - private static final String SOLR_INPUT_DOCUMENT_CONVERTER_CLASS_DOC = "Factory class used to get the SolrClient implementation."; - - public static final String SOLR_COMMIT_WITHIN_CONFIG = "commit.within"; - private static final String SOLR_COMMIT_WITHIN_DOC = "Configures Solr UpdaterRequest for a commit within the requested number of milliseconds ."; - public static final String COLUMN_IGNORE_UNKNOWN_FIELDS_CONFIG = "column.ignore.unknown.fields"; - private static final String COLUMN_IGNORE_UNKNOWN_FIELDS_DOC = "Flag to determine if the connector should raise an exception when it encountered a field it doesn't have configured."; - - - public static ConfigDef baseConfigDef() { - return new ConfigDef() - .define(TOPIC_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, TOPIC_DOC) - .define(SOLR_INPUT_DOCUMENT_CONVERTER_CLASS_CONFIG, ConfigDef.Type.CLASS, DefaultSolrInputDocumentConverter.class.getName(), ConfigDef.Importance.HIGH, SOLR_INPUT_DOCUMENT_CONVERTER_CLASS_DOC) - .define(SOLR_COMMIT_WITHIN_CONFIG, ConfigDef.Type.INT, null, ConfigDef.Importance.LOW, SOLR_COMMIT_WITHIN_DOC) - .define(COLUMN_IGNORE_UNKNOWN_FIELDS_CONFIG, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.LOW, COLUMN_IGNORE_UNKNOWN_FIELDS_DOC) - ; - } - - final List fieldConfigs; - - protected SolrSinkTopicConfig(ConfigDef subclassConfigDef, Map props) { - super(subclassConfigDef, props); - this.fieldConfigs = loadFieldConfigs(); - } - - public SolrInputDocumentConverter getSolrInputDocumentConverter(){ - SolrInputDocumentConverter solrInputDocumentConverter = this.getConfiguredInstance(SOLR_INPUT_DOCUMENT_CONVERTER_CLASS_CONFIG, SolrInputDocumentConverter.class); - solrInputDocumentConverter.configure(this); - return solrInputDocumentConverter; - } - - public String getTopic() { - return this.getString(TOPIC_CONFIG); - } - - public Integer getCommitWithin() { - return this.getInt(SOLR_COMMIT_WITHIN_CONFIG); - } - - private List loadFieldConfigs(){ - Pattern pattern = Pattern.compile("^column\\.mappings\\.(.+)\\."); - - Map prefixes = new LinkedHashMap<>(); - - Map input = originalsStrings(); - - for(Map.Entry kvp:input.entrySet()) { - Matcher matcher = pattern.matcher(kvp.getKey()); - - if(!matcher.find()){ - continue; - } - - String prefix = matcher.group(0); - String sourceColumn = matcher.group(1); - prefixes.put(sourceColumn, prefix); - } - - List fieldConfigs = new ArrayList<>(); - - for(Map.Entry prefix:prefixes.entrySet()){ - Map prefixedOriginals = this.originalsWithPrefix(prefix.getValue()); - Map stringOriginals = new LinkedHashMap<>(); - for(Map.Entry kvp:prefixedOriginals.entrySet()){ - stringOriginals.put(kvp.getKey(), kvp.getValue().toString()); - } - FieldConfig fieldConfig = new FieldConfig(prefix.getKey(), stringOriginals); - fieldConfigs.add(fieldConfig); - } - return fieldConfigs; - } - - public List getFieldConfigs(){ - return this.fieldConfigs; - } - - public boolean ignoreUnknownFields(){ - return this.getBoolean(COLUMN_IGNORE_UNKNOWN_FIELDS_CONFIG); - } -} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnectorConfigTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnectorConfigTest.java new file mode 100644 index 0000000..98b5680 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/solr/CloudSolrSinkConnectorConfigTest.java @@ -0,0 +1,11 @@ +package com.github.jcustenborder.kafka.connect.solr; + +import io.confluent.kafka.connect.utils.config.MarkdownFormatter; +import org.junit.jupiter.api.Test; + +public class CloudSolrSinkConnectorConfigTest { + @Test + public void toMarkdown() { + System.out.println(MarkdownFormatter.toMarkdown(CloudSolrSinkConnectorConfig.config())); + } +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkConnectorConfigTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkConnectorConfigTest.java new file mode 100644 index 0000000..88dd264 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/solr/HttpSolrSinkConnectorConfigTest.java @@ -0,0 +1,46 @@ +/** + * Copyright © 2016 Jeremy Custenborder (jcustenborder@gmail.com) + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.jcustenborder.kafka.connect.solr; + +import com.google.common.collect.ImmutableMap; +import io.confluent.kafka.connect.utils.config.MarkdownFormatter; +import org.junit.jupiter.api.Test; + +import java.util.Map; + + +public class HttpSolrSinkConnectorConfigTest { + + public static Map settings() { + return ImmutableMap.of( + HttpSolrSinkConnectorConfig.SOLR_URL_CONFIG, "http://localhost:9231", + HttpSolrSinkConnectorConfig.CORE_NAME_CONFIG, "muffins" + + ); + } + + @Test + public void doc() { + System.out.println(MarkdownFormatter.toMarkdown(HttpSolrSinkConnectorConfig.config())); + } + + + @Test + public void foo() { + HttpSolrSinkConnectorConfig config = new HttpSolrSinkConnectorConfig(settings()); + } + +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/solr/Records.java b/src/test/java/com/github/jcustenborder/kafka/connect/solr/Records.java new file mode 100644 index 0000000..2ff6ed2 --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/solr/Records.java @@ -0,0 +1,89 @@ +package com.github.jcustenborder.kafka.connect.solr; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +class Records { + + public static class TestCase { + SinkRecord record; + + } + + public static class StructTestCase extends TestCase { + Struct struct; + } + + public static class MapTestCase extends TestCase { + Map map; + } + + static MapTestCase map() { + MapTestCase testCase = new MapTestCase(); + + testCase.map = ImmutableMap.of( + "firstName", "example", + "lastName", "user", + "email", "example.user@example.com", + "age", 27 + ); + testCase.record = new SinkRecord( + "testing", + 1, + null, + null, + null, + testCase.map, + 1L, + 1484897702123L, + TimestampType.CREATE_TIME + ); + + return testCase; + } + + static StructTestCase struct() { + StructTestCase testCase = new StructTestCase(); + + Schema schema = SchemaBuilder.struct() + .name("Testing") + .field("firstName", Schema.OPTIONAL_STRING_SCHEMA) + .field("lastName", Schema.OPTIONAL_STRING_SCHEMA) + .field("email", Schema.OPTIONAL_STRING_SCHEMA) + .field("age", Schema.OPTIONAL_INT32_SCHEMA) + .build(); + testCase.struct = new Struct(schema) + .put("firstName", "example") + .put("lastName", "user") + .put("email", "example.user@example.com") + .put("age", 27); + testCase.record = new SinkRecord( + "testing", + 1, + null, + null, + null, + testCase.struct, + 2L, + 1484897702123L, + TimestampType.CREATE_TIME + ); + + return testCase; + } + + static List records() { + return Arrays.asList( + struct().record, + map().record + ); + } +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/solr/SolrInputDocumentBuilderTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/solr/SolrInputDocumentBuilderTest.java new file mode 100644 index 0000000..5a8d70e --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/solr/SolrInputDocumentBuilderTest.java @@ -0,0 +1,39 @@ +package com.github.jcustenborder.kafka.connect.solr; + +import org.apache.kafka.connect.data.Field; +import org.apache.solr.common.SolrInputDocument; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class SolrInputDocumentBuilderTest { + + @Test + public void buildMap() { + Records.MapTestCase mapTestCase = Records.map(); + + SolrInputDocument document = SolrInputDocumentBuilder.build(mapTestCase.record); + assertNotNull(document, "document should not be null."); + for (Map.Entry kvp : mapTestCase.map.entrySet()) { + assertEquals(kvp.getValue(), document.getFieldValue(kvp.getKey()), kvp.getKey() + " does not match."); + } + } + + + @Test + public void buildStruct() { + Records.StructTestCase structTestCase = Records.struct(); + SolrInputDocument document = SolrInputDocumentBuilder.build(structTestCase.record); + assertNotNull(document, "document should not be null."); + List fields = structTestCase.struct.schema().fields(); + for (Field field : fields) { + Object value = structTestCase.struct.get(field); + assertEquals(value, document.getFieldValue(field.name()), field.name() + " does not match."); + } + } + +} diff --git a/src/test/java/com/github/jcustenborder/kafka/connect/solr/SolrSinkTaskTest.java b/src/test/java/com/github/jcustenborder/kafka/connect/solr/SolrSinkTaskTest.java new file mode 100644 index 0000000..3b7f7fa --- /dev/null +++ b/src/test/java/com/github/jcustenborder/kafka/connect/solr/SolrSinkTaskTest.java @@ -0,0 +1,68 @@ +package com.github.jcustenborder.kafka.connect.solr; + +import com.google.common.collect.ImmutableMap; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrRequest; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.common.util.NamedList; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + +public class SolrSinkTaskTest { + private static final Logger log = LoggerFactory.getLogger(SolrSinkTaskTest.class); + SolrSinkTask task; + SolrClient client; + + @BeforeEach + public void before() throws IOException, SolrServerException { + this.task = mock(SolrSinkTask.class, Mockito.CALLS_REAL_METHODS); + this.client = mock(SolrClient.class); + when(this.task.client()).thenReturn(this.client); + when(this.task.config(anyMap())).thenAnswer(invocationOnMock -> { + Map settings = invocationOnMock.getArgument(0); + return new SolrSinkConnectorConfig(SolrSinkConnectorConfig.config(), settings); + }); + + Map settings = ImmutableMap.of(); + this.task.start(settings); + when(this.client.request(any(UpdateRequest.class), any())).thenAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + log.trace("request"); + NamedList result = new NamedList<>(); + NamedList responseHeaders = new NamedList<>(); + responseHeaders.add("status", 200); + responseHeaders.add("QTime", 123); + result.add("responseHeader", responseHeaders); + return result; + } + }); + } + + @Test + public void test() { + List records = Records.records(); + this.task.put(records); + + + } + +} diff --git a/src/test/java/io/confluent/connect/solr/sink/DefaultSolrInputDocumentConverterTest.java b/src/test/java/io/confluent/connect/solr/sink/DefaultSolrInputDocumentConverterTest.java deleted file mode 100644 index 18429f5..0000000 --- a/src/test/java/io/confluent/connect/solr/sink/DefaultSolrInputDocumentConverterTest.java +++ /dev/null @@ -1,212 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink; - - -import java.util.Date; -import java.util.HashMap; -import java.util.Map; - -import io.confluent.connect.solr.sink.DefaultSolrInputDocumentConverter; -import io.confluent.connect.solr.sink.SolrInputDocumentConverter; -import io.confluent.connect.solr.sink.config.CloudSolrSinkTopicConfig; -import io.confluent.connect.solr.sink.config.SolrSinkTopicConfig; -import org.apache.kafka.connect.data.*; -import org.apache.solr.common.SolrInputDocument; -import org.apache.solr.common.SolrInputField; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -public class DefaultSolrInputDocumentConverterTest { - - SolrInputDocumentConverter solrInputDocumentConverter; - - @Before - public void before(){ - this.solrInputDocumentConverter = new DefaultSolrInputDocumentConverter(); - - Map props = new HashMap<>(); - props.put(SolrSinkTopicConfig.TOPIC_CONFIG, "foo"); - props.put(CloudSolrSinkTopicConfig.COLLECTION_NAME_CONFIG, "foo"); - CloudSolrSinkTopicConfig topicConfig = new CloudSolrSinkTopicConfig(props); - this.solrInputDocumentConverter.configure(topicConfig); - } - - private void convertField(Schema fieldSchema, final Object inputValue, final Object expectedValue){ - final String FIELD_NAME = "asdf"; - final String TOPIC="test"; - final Schema valueSchema = SchemaBuilder.struct(). - field(FIELD_NAME, fieldSchema) - .build(); - final Field field = valueSchema.field(FIELD_NAME); - final Struct value = new Struct(valueSchema) - .put(FIELD_NAME, inputValue); - SolrInputDocument solrInputDocument = new SolrInputDocument(); - this.solrInputDocumentConverter.convertField(value, field, solrInputDocument); - - SolrInputField solrInputField = solrInputDocument.getField(FIELD_NAME); - - if(null==expectedValue){ - Assert.assertNull( - String.format("SolrInputDocument should not have '%s' field.", FIELD_NAME), - solrInputField - ); - } else { - Assert.assertNotNull( - String.format("SolrInputDocument should have '%s' field.", FIELD_NAME), - solrInputField - ); - - final Object actualValue = solrInputField.getValue(); - Assert.assertEquals( - String.format("'%s' field of SolrInputDocument does not have the expected value.", FIELD_NAME), - expectedValue, - actualValue - ); - } - } - - @Test - public void convertField_timestamp_schema(){ - Schema schema = Timestamp.SCHEMA; - final Date expected = new Date(); - convertField(schema, expected, expected); - } - - @Test - public void convertField_optional_timestamp_schema(){ - Schema schema = Timestamp.builder().optional().build(); - final Date expected = new Date(); - convertField(schema, expected, expected); - convertField(schema, null, null); - } - - @Test - public void convertField_int8_schema(){ - convertField(Schema.INT8_SCHEMA, Byte.MAX_VALUE, Byte.MAX_VALUE); - convertField(Schema.INT8_SCHEMA, Byte.MIN_VALUE, Byte.MIN_VALUE); - convertField(Schema.INT8_SCHEMA, (byte)0, (byte)0); - } - - @Test - public void convertField_optional_int8_schema(){ - convertField(Schema.OPTIONAL_INT8_SCHEMA, Byte.MAX_VALUE, Byte.MAX_VALUE); - convertField(Schema.OPTIONAL_INT8_SCHEMA, Byte.MIN_VALUE, Byte.MIN_VALUE); - convertField(Schema.OPTIONAL_INT8_SCHEMA, (byte)0, (byte)0); - convertField(Schema.OPTIONAL_INT8_SCHEMA, null, null); - } - - @Test - public void convertField_int16_schema(){ - convertField(Schema.INT16_SCHEMA, Short.MAX_VALUE, Short.MAX_VALUE); - convertField(Schema.INT16_SCHEMA, Short.MIN_VALUE, Short.MIN_VALUE); - convertField(Schema.INT16_SCHEMA, (short)0, (short)0); - } - - @Test - public void convertField_optional_int16_schema(){ - convertField(Schema.OPTIONAL_INT16_SCHEMA, Short.MAX_VALUE, Short.MAX_VALUE); - convertField(Schema.OPTIONAL_INT16_SCHEMA, Short.MIN_VALUE, Short.MIN_VALUE); - convertField(Schema.OPTIONAL_INT16_SCHEMA, (short)0, (short)0); - convertField(Schema.OPTIONAL_INT16_SCHEMA, null, null); - } - - - @Test - public void convertField_int32_schema(){ - convertField(Schema.INT32_SCHEMA, Integer.MAX_VALUE, Integer.MAX_VALUE); - convertField(Schema.INT32_SCHEMA, Integer.MIN_VALUE, Integer.MIN_VALUE); - convertField(Schema.INT32_SCHEMA, 0, 0); - } - - @Test - public void convertField_optional_int32_schema(){ - convertField(Schema.OPTIONAL_INT32_SCHEMA, Integer.MAX_VALUE, Integer.MAX_VALUE); - convertField(Schema.OPTIONAL_INT32_SCHEMA, Integer.MIN_VALUE, Integer.MIN_VALUE); - convertField(Schema.OPTIONAL_INT32_SCHEMA, 0, 0); - convertField(Schema.OPTIONAL_INT32_SCHEMA, null, null); - } - - @Test - public void convertField_int64_schema(){ - convertField(Schema.INT64_SCHEMA, Long.MAX_VALUE, Long.MAX_VALUE); - convertField(Schema.INT64_SCHEMA, Long.MIN_VALUE, Long.MIN_VALUE); - convertField(Schema.INT64_SCHEMA, 0L, 0L); - } - - @Test - public void convertField_optional_int64_schema(){ - convertField(Schema.OPTIONAL_INT64_SCHEMA, Long.MAX_VALUE, Long.MAX_VALUE); - convertField(Schema.OPTIONAL_INT64_SCHEMA, Long.MIN_VALUE, Long.MIN_VALUE); - convertField(Schema.OPTIONAL_INT64_SCHEMA, 0L, 0L); - convertField(Schema.OPTIONAL_INT64_SCHEMA, null, null); - } - - @Test - public void convertField_float32_schema(){ - convertField(Schema.FLOAT32_SCHEMA, Float.MAX_VALUE, Float.MAX_VALUE); - convertField(Schema.FLOAT32_SCHEMA, Float.MIN_VALUE, Float.MIN_VALUE); - convertField(Schema.FLOAT32_SCHEMA, 0.0F, 0.0F); - } - - @Test - public void convertField_optional_float32_schema(){ - convertField(Schema.OPTIONAL_FLOAT32_SCHEMA, Float.MAX_VALUE, Float.MAX_VALUE); - convertField(Schema.OPTIONAL_FLOAT32_SCHEMA, Float.MIN_VALUE, Float.MIN_VALUE); - convertField(Schema.OPTIONAL_FLOAT32_SCHEMA, 0.0F, 0.0F); - convertField(Schema.OPTIONAL_FLOAT32_SCHEMA, null, null); - } - - @Test - public void convertField_float64_schema(){ - convertField(Schema.FLOAT64_SCHEMA, Double.MAX_VALUE, Double.MAX_VALUE); - convertField(Schema.FLOAT64_SCHEMA, Double.MIN_VALUE, Double.MIN_VALUE); - convertField(Schema.FLOAT64_SCHEMA, 0.0D, 0.0D); - } - - @Test - public void convertField_optional_float64_schema(){ - convertField(Schema.OPTIONAL_FLOAT64_SCHEMA, Double.MAX_VALUE, Double.MAX_VALUE); - convertField(Schema.OPTIONAL_FLOAT64_SCHEMA, Double.MIN_VALUE, Double.MIN_VALUE); - convertField(Schema.OPTIONAL_FLOAT64_SCHEMA, 0.0D, 0.0D); - convertField(Schema.OPTIONAL_FLOAT64_SCHEMA, null, null); - } - - @Test - public void convertField_boolean_schema(){ - convertField(Schema.BOOLEAN_SCHEMA, Boolean.TRUE, Boolean.TRUE); - convertField(Schema.BOOLEAN_SCHEMA, Boolean.FALSE, Boolean.FALSE); - } - - @Test - public void convertField_optional_boolean_schema(){ - convertField(Schema.OPTIONAL_BOOLEAN_SCHEMA, Boolean.TRUE, Boolean.TRUE); - convertField(Schema.OPTIONAL_BOOLEAN_SCHEMA, Boolean.FALSE, Boolean.FALSE); - convertField(Schema.OPTIONAL_BOOLEAN_SCHEMA, null, null); - } - - @Test - public void convertField_string_schema(){ - convertField(Schema.STRING_SCHEMA, "This is a test", "This is a test"); - } - - @Test - public void convertField_optional_string_schema(){ - convertField(Schema.OPTIONAL_STRING_SCHEMA, "This is a test", "This is a test"); - convertField(Schema.OPTIONAL_STRING_SCHEMA, null, null); - } -} diff --git a/src/test/java/io/confluent/connect/solr/sink/config/CloudSolrInputDocumentHandlerConfigTest.java b/src/test/java/io/confluent/connect/solr/sink/config/CloudSolrInputDocumentHandlerConfigTest.java deleted file mode 100644 index 2e78e83..0000000 --- a/src/test/java/io/confluent/connect/solr/sink/config/CloudSolrInputDocumentHandlerConfigTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink.config; - - -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -public class CloudSolrInputDocumentHandlerConfigTest { - Map props; - CloudSolrSinkConnectorConfig config; - - @Before - public void before(){ - this.props = new LinkedHashMap<>(); - this.props.put(CloudSolrSinkConnectorConfig.ZOOKEEPER_CHROOT_CONFIG, "/solr"); - this.props.put(CloudSolrSinkConnectorConfig.ZOOKEEPER_HOSTS_CONFIG, "server1:2181,server2:2181,server3:2181"); - - this.config = new CloudSolrSinkConnectorConfig(this.props); - } - - @Test - public void getZookeeperChroot(){ - final String expected = "/solr"; - final String actual = this.config.getZookeeperChroot(); - Assert.assertEquals(expected, actual); - } - - @Test - public void getZookeeperChroot_null(){ - this.props.remove(CloudSolrSinkConnectorConfig.ZOOKEEPER_CHROOT_CONFIG); - this.config = new CloudSolrSinkConnectorConfig(props); - final String expected = null; - final String actual = this.config.getZookeeperChroot(); - Assert.assertEquals(expected, actual); - } - - @Test - public void getZookeeperHosts(){ - final List expected = Arrays.asList("server1:2181", "server2:2181", "server3:2181"); - final List actual = this.config.getZookeeperHosts(); - Assert.assertEquals(expected, actual); - } - - @Test(expected = org.apache.kafka.common.config.ConfigException.class) - public void getZookeeperHosts_null(){ - this.props.remove(CloudSolrSinkConnectorConfig.ZOOKEEPER_HOSTS_CONFIG); - this.config = new CloudSolrSinkConnectorConfig(props); - } - - @Test - public void toHtmlTable(){ - System.out.println(CloudSolrSinkConnectorConfig.configDef().toHtmlTable()); - } -} diff --git a/src/test/java/io/confluent/connect/solr/sink/config/HttpSolrSinkConnectorConfigTest.java b/src/test/java/io/confluent/connect/solr/sink/config/HttpSolrSinkConnectorConfigTest.java deleted file mode 100644 index 06fff1b..0000000 --- a/src/test/java/io/confluent/connect/solr/sink/config/HttpSolrSinkConnectorConfigTest.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink.config; - -/** - * Created by jeremy on 3/14/16. - */ -public class HttpSolrSinkConnectorConfigTest { -} diff --git a/src/test/java/io/confluent/connect/solr/sink/config/SolrSinkConnectorConfigTest.java b/src/test/java/io/confluent/connect/solr/sink/config/SolrSinkConnectorConfigTest.java deleted file mode 100644 index 13aa04a..0000000 --- a/src/test/java/io/confluent/connect/solr/sink/config/SolrSinkConnectorConfigTest.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Copyright (C) 2016 Jeremy Custenborder (jcustenborder@gmail.com) - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.confluent.connect.solr.sink.config; - -/** - * Created by jeremy on 3/14/16. - */ -public class SolrSinkConnectorConfigTest { -} diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml new file mode 100644 index 0000000..f854600 --- /dev/null +++ b/src/test/resources/logback.xml @@ -0,0 +1,16 @@ + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + \ No newline at end of file