Skip to content

Commit

Permalink
NIFI-5640: Improved efficiency of Avro Reader and some methods of Avr…
Browse files Browse the repository at this point in the history
…oTypeUtil. Also switched ServiceStateTransition to using read/write locks instead of synchronized blocks because profiling showed that significant time was spent in determining state of a Controller Service when attempting to use it. Switching to a ReadLock should provide better performance there.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes apache#3036
  • Loading branch information
markap14 authored and mattyb149 committed Sep 27, 2018
1 parent ad4c886 commit 2e1005e
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,25 @@

package org.apache.nifi.serialization;

import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;

public class SimpleRecordSchema implements RecordSchema {
private List<RecordField> fields = null;
private Map<String, RecordField> fieldMap = null;
private final boolean textAvailable;
private final String text;
private final AtomicReference<String> text = new AtomicReference<>();
private final String schemaFormat;
private final SchemaIdentifier schemaIdentifier;

Expand All @@ -50,6 +51,10 @@ public SimpleRecordSchema(final String text, final String schemaFormat, final Sc
this(text, schemaFormat, true, id);
}

public SimpleRecordSchema(final SchemaIdentifier id) {
this(null, null, false, id);
}

public SimpleRecordSchema(final List<RecordField> fields, final String text, final String schemaFormat, final SchemaIdentifier id) {
this(fields, text, schemaFormat, true, id);
}
Expand All @@ -60,7 +65,7 @@ private SimpleRecordSchema(final List<RecordField> fields, final String text, fi
}

private SimpleRecordSchema(final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) {
this.text = text;
this.text.set(text);
this.schemaFormat = schemaFormat;
this.schemaIdentifier = id;
this.textAvailable = textAvailable;
Expand All @@ -69,7 +74,7 @@ private SimpleRecordSchema(final String text, final String schemaFormat, final b
@Override
public Optional<String> getSchemaText() {
if (textAvailable) {
return Optional.ofNullable(text);
return Optional.ofNullable(text.get());
} else {
return Optional.empty();
}
Expand Down Expand Up @@ -121,13 +126,13 @@ public RecordField getField(final int index) {

@Override
public List<DataType> getDataTypes() {
return getFields().stream().map(recordField -> recordField.getDataType())
return getFields().stream().map(RecordField::getDataType)
.collect(Collectors.toList());
}

@Override
public List<String> getFieldNames() {
return getFields().stream().map(recordField -> recordField.getFieldName())
return getFields().stream().map(RecordField::getFieldName)
.collect(Collectors.toList());
}

Expand Down Expand Up @@ -189,7 +194,19 @@ private static String createText(final List<RecordField> fields) {

@Override
public String toString() {
return text;
String textValue = text.get();
if (textValue != null) {
return textValue;
}

textValue = createText(fields);
final boolean updated = text.compareAndSet(null, textValue);

if (updated) {
return textValue;
} else {
return text.get();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,6 @@

package org.apache.nifi.avro;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.apache.avro.Conversions;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
Expand Down Expand Up @@ -72,6 +50,27 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

public class AvroTypeUtil {
private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class);
public static final String AVRO_SCHEMA_FORMAT = "avro";
Expand Down Expand Up @@ -308,7 +307,7 @@ public static DataType determineDataType(final Schema avroSchema, Map<String, Da
if (knownRecordTypes.containsKey(schemaFullName)) {
return knownRecordTypes.get(schemaFullName);
} else {
SimpleRecordSchema recordSchema = new SimpleRecordSchema(avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY);
SimpleRecordSchema recordSchema = new SimpleRecordSchema(SchemaIdentifier.EMPTY);
DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
knownRecordTypes.put(schemaFullName, recordSchemaType);

Expand Down Expand Up @@ -353,23 +352,33 @@ public static DataType determineDataType(final Schema avroSchema, Map<String, Da
return null;
}

private static List<Schema> getNonNullSubSchemas(Schema avroSchema) {
List<Schema> unionFieldSchemas = avroSchema.getTypes();
private static List<Schema> getNonNullSubSchemas(final Schema avroSchema) {
final List<Schema> unionFieldSchemas = avroSchema.getTypes();
if (unionFieldSchemas == null) {
return Collections.emptyList();
}
return unionFieldSchemas.stream()
.filter(s -> s.getType() != Type.NULL)
.collect(Collectors.toList());

final List<Schema> nonNullTypes = new ArrayList<>(unionFieldSchemas.size());
for (final Schema fieldSchema : unionFieldSchemas) {
if (fieldSchema.getType() != Type.NULL) {
nonNullTypes.add(fieldSchema);
}
}

return nonNullTypes;
}

public static RecordSchema createSchema(final Schema avroSchema) {
return createSchema(avroSchema, true);
}

public static RecordSchema createSchema(final Schema avroSchema, final boolean includeText) {
if (avroSchema == null) {
throw new IllegalArgumentException("Avro Schema cannot be null");
}

SchemaIdentifier identifier = new StandardSchemaIdentifier.Builder().name(avroSchema.getName()).build();
return createSchema(avroSchema, avroSchema.toString(), identifier);
return createSchema(avroSchema, includeText ? avroSchema.toString() : null, identifier);
}

/**
Expand All @@ -385,10 +394,10 @@ public static RecordSchema createSchema(final Schema avroSchema, final String sc
throw new IllegalArgumentException("Avro Schema cannot be null");
}

String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName();
SimpleRecordSchema recordSchema = new SimpleRecordSchema(schemaText, AVRO_SCHEMA_FORMAT, schemaId);
DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
Map<String, DataType> knownRecords = new HashMap<>();
final String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName();
final SimpleRecordSchema recordSchema = schemaText == null ? new SimpleRecordSchema(schemaId) : new SimpleRecordSchema(schemaText, AVRO_SCHEMA_FORMAT, schemaId);
final DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
final Map<String, DataType> knownRecords = new HashMap<>();
knownRecords.put(schemaFullName, recordSchemaType);

final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
Expand Down Expand Up @@ -752,36 +761,39 @@ public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avr
* @param conversion the conversion function which takes a non-null field schema within the union field and returns a converted value
* @return a converted value
*/
private static Object convertUnionFieldValue(Object originalValue, Schema fieldSchema, Function<Schema, Object> conversion, final String fieldName) {
// Ignore null types in union
final List<Schema> nonNullFieldSchemas = getNonNullSubSchemas(fieldSchema);

// If at least one non-null type exists, find the first compatible type
if (nonNullFieldSchemas.size() >= 1) {
for (final Schema nonNullFieldSchema : nonNullFieldSchemas) {
final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema);
try {
final Object convertedValue = conversion.apply(nonNullFieldSchema);

if (isCompatibleDataType(convertedValue, desiredDataType)) {
return convertedValue;
}
private static Object convertUnionFieldValue(final Object originalValue, final Schema fieldSchema, final Function<Schema, Object> conversion, final String fieldName) {
boolean foundNonNull = false;
for (final Schema subSchema : fieldSchema.getTypes()) {
if (subSchema.getType() == Type.NULL) {
continue;
}

// For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
if (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) {
return convertedValue;
}
} catch (Exception e) {
// If failed with one of possible types, continue with the next available option.
if (logger.isDebugEnabled()) {
logger.debug("Cannot convert value {} to type {}", originalValue, desiredDataType, e);
}
foundNonNull = true;
final DataType desiredDataType = AvroTypeUtil.determineDataType(subSchema);
try {
final Object convertedValue = conversion.apply(subSchema);

if (isCompatibleDataType(convertedValue, desiredDataType)) {
return convertedValue;
}

// For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
if (subSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) {
return convertedValue;
}
} catch (Exception e) {
// If failed with one of possible types, continue with the next available option.
if (logger.isDebugEnabled()) {
logger.debug("Cannot convert value {} to type {}", originalValue, desiredDataType, e);
}
}
}

if (foundNonNull) {
throw new IllegalTypeConversionException("Cannot convert value " + originalValue + " of type " + originalValue.getClass()
+ " because no compatible types exist in the UNION for field " + fieldName);
}

return null;
}

Expand Down Expand Up @@ -875,7 +887,7 @@ private static Object normalizeValue(final Object value, final Schema avroSchema
final Object fieldValue = normalizeValue(avroFieldValue, field.schema(), fieldName + "/" + field.name());
values.put(field.name(), fieldValue);
}
final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema);
final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema, false);
return new MapRecord(childSchema, values);
case BYTES:
final ByteBuffer bb = (ByteBuffer) value;
Expand Down
Loading

0 comments on commit 2e1005e

Please sign in to comment.