Skip to content

Commit

Permalink
Merge pull request #951 from irajhedayati/retrieve-avro-schema-from-s…
Browse files Browse the repository at this point in the history
…chemaregistry

Query Avro schema from schema registry if it is missing
  • Loading branch information
HenryCaiHaiying authored Sep 23, 2019
2 parents 6572188 + b7fd873 commit da6d8b9
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
package com.pinterest.secor.common;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -44,7 +47,7 @@ public SecorSchemaRegistryClient(SecorConfig config) {
props.put("schema.registry.url", config.getSchemaRegistryUrl());
schemaRegistryClient = new CachedSchemaRegistryClient(config.getSchemaRegistryUrl(), 30);
init(config);
} catch (Exception e){
} catch (Exception e) {
LOG.error("Error initalizing schema registry", e);
throw new RuntimeException(e);
}
Expand All @@ -67,10 +70,27 @@ public GenericRecord deserialize(String topic, byte[] message) {
return record;
}

/**
* Get Avro schema of a topic. It uses the cache that either is set by calling {@link #deserialize(String, byte[])}
* or querying this method to avoid hitting Schema Registry for each call.
* It uses standard "subject name" strategy and it is topic_name-value.
*
* @param topic a Kafka topic to query the schema for
* @return Schema object for the topic
* @throws IllegalStateException if there is no schema registered for this topic or it is not able to fetch it
*/
public Schema getSchema(String topic) {
Schema schema = schemas.get(topic);
if (schema == null) {
throw new IllegalStateException("Avro schema not found for topic " + topic);
try {
SchemaMetadata schemaMetadata = schemaRegistryClient.getLatestSchemaMetadata(topic + "-value");
schema = schemaRegistryClient.getByID(schemaMetadata.getId());
schemas.put(topic, schema);
} catch (IOException e) {
throw new IllegalStateException("Unable to get Avro schema not found for topic " + topic);
} catch (RestClientException e) {
throw new IllegalStateException("Avro schema not found for topic " + topic);
}
}
return schema;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,23 @@

import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import junit.framework.TestCase;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.commons.lang3.StringUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.modules.junit4.PowerMockRunner;

import java.util.Properties;
import java.io.IOException;

import static org.mockito.Mockito.when;

Expand All @@ -45,10 +46,11 @@ public class SecorSchemaRegistryClientTest extends TestCase {
private KafkaAvroDeserializer kafkaAvroDeserializer;
private SchemaRegistryClient schemaRegistryClient;
private SecorSchemaRegistryClient secorSchemaRegistryClient;
private SecorConfig secorConfig;
private SpecificDatumWriter<GenericRecord> writer;
private KafkaAvroSerializer avroSerializer;

@Rule
public ExpectedException exception = ExpectedException.none();

@Override
public void setUp() {
initKafka();
Expand All @@ -62,13 +64,11 @@ public void setUp() {
private void initKafka() {
schemaRegistryClient = new MockSchemaRegistryClient();
kafkaAvroDeserializer = new KafkaAvroDeserializer(schemaRegistryClient);
Properties defaultConfig = new Properties();
defaultConfig.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "bogus");
avroSerializer = new KafkaAvroSerializer(schemaRegistryClient);
}

@Test
public void testDecodeMessage() throws Exception {
public void testDecodeMessage() {
Schema schemaV1 = SchemaBuilder.record("Foo")
.fields()
.name("data_field_1").type().intType().noDefault()
Expand Down Expand Up @@ -104,4 +104,23 @@ public void testDecodeMessage() throws Exception {
output = secorSchemaRegistryClient.deserialize("test-avr-topic", new byte[0]);
assertNull(output);
}

@Test
public void testGetSchema() throws IOException, RestClientException {
Schema expectedSchema = SchemaBuilder.record("Foo")
.fields()
.name("data_field_1").type().intType().noDefault()
.name("timestamp").type().longType().noDefault()
.endRecord();
schemaRegistryClient.register("test-avr-topic-2-value", expectedSchema);
Schema schema = secorSchemaRegistryClient.getSchema("test-avr-topic-2");
assertEquals(expectedSchema, schema);
}

@Test
public void testGetSchemaDoesNotExist() {
exception.expect(IllegalStateException.class);
exception.expectMessage("Avro schema not found for topic test-avr-topic-3");
secorSchemaRegistryClient.getSchema("test-avr-topic-3");
}
}

0 comments on commit da6d8b9

Please sign in to comment.