diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle
index 8014e9fde055f..5c68ed2d591a9 100644
--- a/sdks/java/core/build.gradle
+++ b/sdks/java/core/build.gradle
@@ -36,7 +36,7 @@ applyJavaNature(
relocate "org.antlr.v4", getJavaRelocatedPath("org.antlr.v4")
},
)
-applyAvroNature()
+//applyAvroNature()
applyAntlrNature()
generateGrammarSource {
@@ -89,7 +89,7 @@ dependencies {
shadow library.java.jackson_annotations
shadow library.java.jackson_databind
shadow library.java.slf4j_api
- shadow library.java.avro
+// shadow library.java.avro
shadow library.java.snappy_java
shadow library.java.joda_time
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
@@ -114,7 +114,7 @@ dependencies {
shadowTest "com.esotericsoftware.kryo:kryo:2.21"
shadowTest library.java.quickcheck_core
shadowTest library.java.quickcheck_generators
- shadowTest library.java.avro_tests
+// shadowTest library.java.avro_tests
shadowTest library.java.zstd_jni
shadowTest library.java.commons_logging
shadowTest library.java.log4j
@@ -123,6 +123,6 @@ dependencies {
testRuntimeOnly library.java.slf4j_jdk14
}
-project.tasks.compileTestJava {
- options.compilerArgs += ['-Xlint:-rawtypes'] // generated avro uses rawtypes without suppression
-}
+//project.tasks.compileTestJava {
+// options.compilerArgs += ['-Xlint:-rawtypes'] // generated avro uses rawtypes without suppression
+//}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
deleted file mode 100644
index 8fa162ecf8e4a..0000000000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroCoder.java
+++ /dev/null
@@ -1,820 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import com.google.errorprone.annotations.FormatMethod;
-import com.google.errorprone.annotations.FormatString;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.SortedSet;
-import org.apache.avro.AvroRuntimeException;
-import org.apache.avro.Conversion;
-import org.apache.avro.LogicalType;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DatumWriter;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.reflect.AvroEncode;
-import org.apache.avro.reflect.AvroName;
-import org.apache.avro.reflect.AvroSchema;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.avro.reflect.Union;
-import org.apache.avro.specific.SpecificData;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.avro.specific.SpecificDatumWriter;
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.avro.util.ClassUtils;
-import org.apache.avro.util.Utf8;
-import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
-import org.apache.beam.sdk.values.TypeDescriptor;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-
-/**
- * A {@link Coder} using Avro binary format.
- *
- *
Each instance of {@code AvroCoder} encapsulates an Avro schema for objects of type {@code
- * T}.
- *
- * The Avro schema may be provided explicitly via {@link AvroCoder#of(Class, Schema)} or omitted
- * via {@link AvroCoder#of(Class)}, in which case it will be inferred using Avro's {@link
- * org.apache.avro.reflect.ReflectData}.
- *
- *
For complete details about schema generation and how it can be controlled please see the
- * {@link org.apache.avro.reflect} package. Only concrete classes with a no-argument constructor can
- * be mapped to Avro records. All inherited fields that are not static or transient are included.
- * Fields are not permitted to be null unless annotated by {@link Nullable} or a {@link Union}
- * schema containing {@code "null"}.
- *
- *
To use, specify the {@code Coder} type on a PCollection:
- *
- *
{@code
- * PCollection records =
- * input.apply(...)
- * .setCoder(AvroCoder.of(MyCustomElement.class));
- * }
- *
- * or annotate the element class using {@code @DefaultCoder}.
- *
- *
{@code @DefaultCoder(AvroCoder.class)
- * public class MyCustomElement {
- * ...
- * }
- * }
- *
- * The implementation attempts to determine if the Avro encoding of the given type will satisfy
- * the criteria of {@link Coder#verifyDeterministic} by inspecting both the type and the Schema
- * provided or generated by Avro. Only coders that are deterministic can be used in {@link
- * org.apache.beam.sdk.transforms.GroupByKey} operations.
- *
- * @param the type of elements handled by this coder
- * @deprecated Avro related classes are deprecated in module beam-sdks-java-core
and
- * will be eventually removed. Please, migrate to a new module
- * beam-sdks-java-extensions-avro
by importing
- * org.apache.beam.sdk.extensions.avro.coders.AvroCoder
instead of this one.
- */
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Deprecated
-public class AvroCoder extends CustomCoder {
-
- /**
- * Returns an {@code AvroCoder} instance for the provided element type.
- *
- * @param the element type
- */
- public static AvroCoder of(TypeDescriptor type) {
- return of(type, true);
- }
-
- /**
- * Returns an {@code AvroCoder} instance for the provided element type, respecting whether to use
- * Avro's Reflect* or Specific* suite for encoding and decoding.
- *
- * @param the element type
- */
- public static AvroCoder of(TypeDescriptor type, boolean useReflectApi) {
- @SuppressWarnings("unchecked")
- Class clazz = (Class) type.getRawType();
- return of(clazz, useReflectApi);
- }
-
- /**
- * Returns an {@code AvroCoder} instance for the provided element class.
- *
- * @param the element type
- */
- public static AvroCoder of(Class clazz) {
- return of(clazz, true);
- }
-
- /**
- * Returns an {@code AvroGenericCoder} instance for the Avro schema. The implicit type is
- * GenericRecord.
- */
- public static AvroGenericCoder of(Schema schema) {
- return AvroGenericCoder.of(schema);
- }
-
- /**
- * Returns an {@code AvroCoder} instance for the given class, respecting whether to use Avro's
- * Reflect* or Specific* suite for encoding and decoding.
- *
- * @param the element type
- */
- public static AvroCoder of(Class type, boolean useReflectApi) {
- ClassLoader cl = type.getClassLoader();
- SpecificData data = useReflectApi ? new ReflectData(cl) : new SpecificData(cl);
- return of(type, data.getSchema(type), useReflectApi);
- }
-
- /**
- * Returns an {@code AvroCoder} instance for the provided element type using the provided Avro
- * schema.
- *
- * The schema must correspond to the type provided.
- *
- * @param the element type
- */
- public static AvroCoder of(Class type, Schema schema) {
- return of(type, schema, true);
- }
-
- /**
- * Returns an {@code AvroCoder} instance for the given class and schema, respecting whether to use
- * Avro's Reflect* or Specific* suite for encoding and decoding.
- *
- * @param the element type
- */
- public static AvroCoder of(Class type, Schema schema, boolean useReflectApi) {
- return new AvroCoder<>(type, schema, useReflectApi);
- }
-
- /**
- * Returns a {@link CoderProvider} which uses the {@link AvroCoder} if possible for all types.
- *
- * It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively
- * accept dangerous types such as {@link Object}.
- *
- *
This method is invoked reflectively from {@link DefaultCoder}.
- */
- @SuppressWarnings("unused")
- public static CoderProvider getCoderProvider() {
- return new AvroCoderProvider();
- }
-
- /**
- * A {@link CoderProvider} that constructs an {@link AvroCoder} for Avro compatible classes.
- *
- *
It is unsafe to register this as a {@link CoderProvider} because Avro will reflectively
- * accept dangerous types such as {@link Object}.
- */
- static class AvroCoderProvider extends CoderProvider {
- @Override
- public Coder coderFor(
- TypeDescriptor typeDescriptor, List extends Coder>> componentCoders)
- throws CannotProvideCoderException {
- try {
- return AvroCoder.of(typeDescriptor);
- } catch (AvroRuntimeException e) {
- throw new CannotProvideCoderException(
- String.format("%s is not compatible with Avro", typeDescriptor), e);
- }
- }
- }
-
- private final Class type;
- private final boolean useReflectApi;
- private final SerializableSchemaSupplier schemaSupplier;
- private final TypeDescriptor typeDescriptor;
-
- private final List nonDeterministicReasons;
-
- // Factories allocated by .get() are thread-safe and immutable.
- private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
- private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
-
- /**
- * A {@link Serializable} object that holds the {@link String} version of a {@link Schema}. This
- * is paired with the {@link SerializableSchemaSupplier} via {@link Serializable}'s usage of the
- * {@link #readResolve} method.
- */
- private static class SerializableSchemaString implements Serializable {
- private final String schema;
-
- private SerializableSchemaString(String schema) {
- this.schema = schema;
- }
-
- private Object readResolve() throws IOException, ClassNotFoundException {
- return new SerializableSchemaSupplier(new Schema.Parser().parse(schema));
- }
- }
-
- /**
- * A {@link Serializable} object that delegates to the {@link SerializableSchemaString} via {@link
- * Serializable}'s usage of the {@link #writeReplace} method. Kryo doesn't utilize Java's
- * serialization and hence is able to encode the {@link Schema} object directly.
- */
- private static class SerializableSchemaSupplier implements Serializable, Supplier {
- // writeReplace makes this object serializable. This is a limitation of FindBugs as discussed
- // here:
- // http://stackoverflow.com/questions/26156523/is-writeobject-not-neccesary-using-the-serialization-proxy-pattern
- @SuppressFBWarnings("SE_BAD_FIELD")
- private final Schema schema;
-
- private SerializableSchemaSupplier(Schema schema) {
- this.schema = schema;
- }
-
- private Object writeReplace() {
- return new SerializableSchemaString(schema.toString());
- }
-
- @Override
- public Schema get() {
- return schema;
- }
- }
-
- /**
- * A {@link Serializable} object that lazily supplies a {@link ReflectData} built from the
- * appropriate {@link ClassLoader} for the type encoded by this {@link AvroCoder}.
- */
- private static class SerializableReflectDataSupplier
- implements Serializable, Supplier {
-
- private final Class> clazz;
-
- private SerializableReflectDataSupplier(Class> clazz) {
- this.clazz = clazz;
- }
-
- @Override
- public ReflectData get() {
- ReflectData reflectData = new ReflectData(clazz.getClassLoader());
- reflectData.addLogicalTypeConversion(new JodaTimestampConversion());
- return reflectData;
- }
- }
-
- // Cache the old encoder/decoder and let the factories reuse them when possible. To be threadsafe,
- // these are ThreadLocal. This code does not need to be re-entrant as AvroCoder does not use
- // an inner coder.
- private final EmptyOnDeserializationThreadLocal decoder;
- private final EmptyOnDeserializationThreadLocal encoder;
- private final EmptyOnDeserializationThreadLocal> writer;
- private final EmptyOnDeserializationThreadLocal> reader;
-
- // Lazily re-instantiated after deserialization
- private final Supplier reflectData;
-
- protected AvroCoder(Class type, Schema schema) {
- this(type, schema, false);
- }
-
- protected AvroCoder(Class type, Schema schema, boolean useReflectApi) {
- this.type = type;
- this.useReflectApi = useReflectApi;
- this.schemaSupplier = new SerializableSchemaSupplier(schema);
- typeDescriptor = TypeDescriptor.of(type);
- nonDeterministicReasons = new AvroDeterminismChecker().check(TypeDescriptor.of(type), schema);
-
- // Decoder and Encoder start off null for each thread. They are allocated and potentially
- // reused inside encode/decode.
- this.decoder = new EmptyOnDeserializationThreadLocal<>();
- this.encoder = new EmptyOnDeserializationThreadLocal<>();
-
- this.reflectData = Suppliers.memoize(new SerializableReflectDataSupplier(getType()));
-
- // Reader and writer are allocated once per thread per Coder
- this.reader =
- new EmptyOnDeserializationThreadLocal>() {
- private final AvroCoder myCoder = AvroCoder.this;
-
- @Override
- public DatumReader initialValue() {
- if (myCoder.getType().equals(GenericRecord.class)) {
- return new GenericDatumReader<>(myCoder.getSchema());
- } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType()) && !useReflectApi) {
- return new SpecificDatumReader<>(myCoder.getType());
- }
- return new ReflectDatumReader<>(
- myCoder.getSchema(), myCoder.getSchema(), myCoder.reflectData.get());
- }
- };
-
- this.writer =
- new EmptyOnDeserializationThreadLocal>() {
- private final AvroCoder myCoder = AvroCoder.this;
-
- @Override
- public DatumWriter initialValue() {
- if (myCoder.getType().equals(GenericRecord.class)) {
- return new GenericDatumWriter<>(myCoder.getSchema());
- } else if (SpecificRecord.class.isAssignableFrom(myCoder.getType()) && !useReflectApi) {
- return new SpecificDatumWriter<>(myCoder.getType());
- }
- return new ReflectDatumWriter<>(myCoder.getSchema(), myCoder.reflectData.get());
- }
- };
- }
-
- /** Returns the type this coder encodes/decodes. */
- public Class getType() {
- return type;
- }
-
- public boolean useReflectApi() {
- return useReflectApi;
- }
-
- @Override
- public void encode(T value, OutputStream outStream) throws IOException {
- // Get a BinaryEncoder instance from the ThreadLocal cache and attempt to reuse it.
- BinaryEncoder encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get());
- // Save the potentially-new instance for reuse later.
- encoder.set(encoderInstance);
- writer.get().write(value, encoderInstance);
- // Direct binary encoder does not buffer any data and need not be flushed.
- }
-
- @Override
- public T decode(InputStream inStream) throws IOException {
- // Get a BinaryDecoder instance from the ThreadLocal cache and attempt to reuse it.
- BinaryDecoder decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get());
- // Save the potentially-new instance for later.
- decoder.set(decoderInstance);
- return reader.get().read(null, decoderInstance);
- }
-
- /**
- * @throws NonDeterministicException when the type may not be deterministically encoded using the
- * given {@link Schema}, the {@code directBinaryEncoder}, and the {@link ReflectDatumWriter}
- * or {@link GenericDatumWriter}.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- if (!nonDeterministicReasons.isEmpty()) {
- throw new NonDeterministicException(this, nonDeterministicReasons);
- }
- }
-
- /** Returns the schema used by this coder. */
- public Schema getSchema() {
- return schemaSupplier.get();
- }
-
- @Override
- public TypeDescriptor getEncodedTypeDescriptor() {
- return typeDescriptor;
- }
-
- /**
- * Helper class encapsulating the various pieces of state maintained by the recursive walk used
- * for checking if the encoding will be deterministic.
- */
- private static class AvroDeterminismChecker {
-
- // Reasons that the original type are not deterministic. This accumulates
- // the actual output.
- private List reasons = new ArrayList<>();
-
- // Types that are currently "open". Used to make sure we don't have any
- // recursive types. Note that we assume that all occurrences of a given type
- // are equal, rather than tracking pairs of type + schema.
- private Set> activeTypes = new HashSet<>();
-
- // Similarly to how we record active types, we record the schemas we visit
- // to make sure we don't encounter recursive fields.
- private Set activeSchemas = new HashSet<>();
-
- /** Report an error in the current context. */
- @FormatMethod
- private void reportError(String context, @FormatString String fmt, Object... args) {
- String message = String.format(fmt, args);
- reasons.add(context + ": " + message);
- }
-
- /**
- * Classes that are serialized by Avro as a String include
- *
- *
- * - Subtypes of CharSequence (including String, Avro's mutable Utf8, etc.)
- *
- Several predefined classes (BigDecimal, BigInteger, URI, URL)
- *
- Classes annotated with @Stringable (uses their #toString() and a String constructor)
- *
- *
- * Rather than determine which of these cases are deterministic, we list some classes that
- * definitely are, and treat any others as non-deterministic.
- */
- private static final Set> DETERMINISTIC_STRINGABLE_CLASSES = new HashSet<>();
-
- static {
- // CharSequences:
- DETERMINISTIC_STRINGABLE_CLASSES.add(String.class);
- DETERMINISTIC_STRINGABLE_CLASSES.add(Utf8.class);
-
- // Explicitly Stringable:
- DETERMINISTIC_STRINGABLE_CLASSES.add(java.math.BigDecimal.class);
- DETERMINISTIC_STRINGABLE_CLASSES.add(java.math.BigInteger.class);
- DETERMINISTIC_STRINGABLE_CLASSES.add(java.net.URI.class);
- DETERMINISTIC_STRINGABLE_CLASSES.add(java.net.URL.class);
-
- // Classes annotated with @Stringable:
- }
-
- /** Return true if the given type token is a subtype of *any* of the listed parents. */
- private static boolean isSubtypeOf(TypeDescriptor> type, Class>... parents) {
- for (Class> parent : parents) {
- if (type.isSubtypeOf(TypeDescriptor.of(parent))) {
- return true;
- }
- }
- return false;
- }
-
- protected AvroDeterminismChecker() {}
-
- // The entry point for the check. Should not be recursively called.
- public List check(TypeDescriptor> type, Schema schema) {
- recurse(type.getRawType().getName(), type, schema);
- return reasons;
- }
-
- // This is the method that should be recursively called. It sets up the path
- // and visited types correctly.
- private void recurse(String context, TypeDescriptor> type, Schema schema) {
- if (type.getRawType().isAnnotationPresent(AvroSchema.class)) {
- reportError(context, "Custom schemas are not supported -- remove @AvroSchema.");
- return;
- }
-
- if (!activeTypes.add(type)) {
- reportError(context, "%s appears recursively", type);
- return;
- }
-
- // If the record isn't a true class, but rather a GenericRecord, SpecificRecord, etc.
- // with a specified schema, then we need to make the decision based on the generated
- // implementations.
- if (isSubtypeOf(type, IndexedRecord.class)) {
- checkIndexedRecord(context, schema, null);
- } else {
- doCheck(context, type, schema);
- }
-
- activeTypes.remove(type);
- }
-
- private void doCheck(String context, TypeDescriptor> type, Schema schema) {
- switch (schema.getType()) {
- case ARRAY:
- checkArray(context, type, schema);
- break;
- case ENUM:
- // Enums should be deterministic, since they depend only on the ordinal.
- break;
- case FIXED:
- // Depending on the implementation of GenericFixed, we don't know how
- // the given field will be encoded. So, we assume that it isn't
- // deterministic.
- reportError(context, "FIXED encodings are not guaranteed to be deterministic");
- break;
- case MAP:
- checkMap(context, type, schema);
- break;
- case RECORD:
- if (!(type.getType() instanceof Class)) {
- reportError(context, "Cannot determine type from generic %s due to erasure", type);
- return;
- }
- checkRecord(type, schema);
- break;
- case UNION:
- checkUnion(context, type, schema);
- break;
- case STRING:
- checkString(context, type);
- break;
- case BOOLEAN:
- case BYTES:
- case DOUBLE:
- case INT:
- case FLOAT:
- case LONG:
- case NULL:
- // For types that Avro encodes using one of the above primitives, we assume they are
- // deterministic.
- break;
- default:
- // In any other case (eg., new types added to Avro) we cautiously return
- // false.
- reportError(context, "Unknown schema type %s may be non-deterministic", schema.getType());
- break;
- }
- }
-
- private void checkString(String context, TypeDescriptor> type) {
- // For types that are encoded as strings, we need to make sure they're in an approved
- // list. For other types that are annotated @Stringable, Avro will just use the
- // #toString() methods, which has no guarantees of determinism.
- if (!DETERMINISTIC_STRINGABLE_CLASSES.contains(type.getRawType())) {
- reportError(context, "%s may not have deterministic #toString()", type);
- }
- }
-
- private static final Schema AVRO_NULL_SCHEMA = Schema.create(Schema.Type.NULL);
-
- private void checkUnion(String context, TypeDescriptor> type, Schema schema) {
- final List unionTypes = schema.getTypes();
-
- if (!type.getRawType().isAnnotationPresent(Union.class)) {
- // First check for @Nullable field, which shows up as a union of field type and null.
- if (unionTypes.size() == 2 && unionTypes.contains(AVRO_NULL_SCHEMA)) {
- // Find the Schema that is not NULL and recursively check that it is deterministic.
- Schema nullableFieldSchema =
- unionTypes.get(0).equals(AVRO_NULL_SCHEMA) ? unionTypes.get(1) : unionTypes.get(0);
- doCheck(context, type, nullableFieldSchema);
- return;
- }
-
- // Otherwise report a schema error.
- reportError(context, "Expected type %s to have @Union annotation", type);
- return;
- }
-
- // Errors associated with this union will use the base class as their context.
- String baseClassContext = type.getRawType().getName();
-
- // For a union, we need to make sure that each possible instantiation is deterministic.
- for (Schema concrete : unionTypes) {
- @SuppressWarnings("unchecked")
- TypeDescriptor> unionType = TypeDescriptor.of(ReflectData.get().getClass(concrete));
-
- recurse(baseClassContext, unionType, concrete);
- }
- }
-
- private void checkRecord(TypeDescriptor> type, Schema schema) {
- // For a record, we want to make sure that all the fields are deterministic.
- Class> clazz = type.getRawType();
- for (Schema.Field fieldSchema : schema.getFields()) {
- Field field = getField(clazz, fieldSchema.name());
- String fieldContext = field.getDeclaringClass().getName() + "#" + field.getName();
-
- if (field.isAnnotationPresent(AvroEncode.class)) {
- reportError(
- fieldContext, "Custom encoders may be non-deterministic -- remove @AvroEncode");
- continue;
- }
-
- if (!IndexedRecord.class.isAssignableFrom(field.getType())
- && field.isAnnotationPresent(AvroSchema.class)) {
- // TODO: We should be able to support custom schemas on POJO fields, but we shouldn't
- // need to, so we just allow it in the case of IndexedRecords.
- reportError(
- fieldContext, "Custom schemas are only supported for subtypes of IndexedRecord.");
- continue;
- }
-
- TypeDescriptor> fieldType = type.resolveType(field.getGenericType());
- recurse(fieldContext, fieldType, fieldSchema.schema());
- }
- }
-
- private void checkIndexedRecord(
- String context, Schema schema, @Nullable String specificClassStr) {
-
- if (!activeSchemas.add(schema)) {
- reportError(context, "%s appears recursively", schema.getName());
- return;
- }
-
- switch (schema.getType()) {
- case ARRAY:
- // Generic Records use GenericData.Array to implement arrays, which is
- // essentially an ArrayList, and therefore ordering is deterministic.
- // The array is thus deterministic if the elements are deterministic.
- checkIndexedRecord(context, schema.getElementType(), null);
- break;
- case ENUM:
- // Enums are deterministic because they encode as a single integer.
- break;
- case FIXED:
- // In the case of GenericRecords, FIXED is deterministic because it
- // encodes/decodes as a Byte[].
- break;
- case MAP:
- reportError(
- context,
- "GenericRecord and SpecificRecords use a HashMap to represent MAPs,"
- + " so it is non-deterministic");
- break;
- case RECORD:
- for (Schema.Field field : schema.getFields()) {
- checkIndexedRecord(
- schema.getName() + "." + field.name(),
- field.schema(),
- field.getProp(SpecificData.CLASS_PROP));
- }
- break;
- case STRING:
- // GenericDatumWriter#findStringClass will use a CharSequence or a String
- // for each string, so it is deterministic.
-
- // SpecificCompiler#getStringType will use java.lang.String, org.apache.avro.util.Utf8,
- // or java.lang.CharSequence, unless SpecificData.CLASS_PROP overrides that.
- if (specificClassStr != null) {
- Class> specificClass;
- try {
- specificClass = ClassUtils.forName(specificClassStr);
- if (!DETERMINISTIC_STRINGABLE_CLASSES.contains(specificClass)) {
- reportError(
- context,
- "Specific class %s is not known to be deterministic",
- specificClassStr);
- }
- } catch (ClassNotFoundException e) {
- reportError(
- context, "Specific class %s is not known to be deterministic", specificClassStr);
- }
- }
- break;
- case UNION:
- for (Schema subschema : schema.getTypes()) {
- checkIndexedRecord(subschema.getName(), subschema, null);
- }
- break;
- case BOOLEAN:
- case BYTES:
- case DOUBLE:
- case INT:
- case FLOAT:
- case LONG:
- case NULL:
- // For types that Avro encodes using one of the above primitives, we assume they are
- // deterministic.
- break;
- default:
- reportError(context, "Unknown schema type %s may be non-deterministic", schema.getType());
- break;
- }
-
- activeSchemas.remove(schema);
- }
-
- private void checkMap(String context, TypeDescriptor> type, Schema schema) {
- if (!isSubtypeOf(type, SortedMap.class)) {
- reportError(context, "%s may not be deterministically ordered", type);
- }
-
- // Avro (currently) asserts that all keys are strings.
- // In case that changes, we double check that the key was a string:
- Class> keyType = type.resolveType(Map.class.getTypeParameters()[0]).getRawType();
- if (!String.class.equals(keyType)) {
- reportError(context, "map keys should be Strings, but was %s", keyType);
- }
-
- recurse(context, type.resolveType(Map.class.getTypeParameters()[1]), schema.getValueType());
- }
-
- private void checkArray(String context, TypeDescriptor> type, Schema schema) {
- TypeDescriptor> elementType = null;
- if (type.isArray()) {
- // The type is an array (with ordering)-> deterministic iff the element is deterministic.
- elementType = type.getComponentType();
- } else if (isSubtypeOf(type, Collection.class)) {
- if (isSubtypeOf(type, List.class, SortedSet.class)) {
- // Ordered collection -> deterministic iff the element is deterministic
- elementType = type.resolveType(Collection.class.getTypeParameters()[0]);
- } else {
- // Not an ordered collection -> not deterministic
- reportError(context, "%s may not be deterministically ordered", type);
- return;
- }
- } else {
- // If it was an unknown type encoded as an array, be conservative and assume
- // that we don't know anything about the order.
- reportError(context, "encoding %s as an ARRAY was unexpected", type);
- return;
- }
-
- // If we get here, it's either a deterministically-ordered Collection, or
- // an array. Either way, the type is deterministic iff the element type is
- // deterministic.
- recurse(context, elementType, schema.getElementType());
- }
-
- /**
- * Extract a field from a class. We need to look at the declared fields so that we can see
- * private fields. We may need to walk up to the parent to get classes from the parent.
- */
- private static Field getField(Class> originalClazz, String name) {
- Class> clazz = originalClazz;
- while (clazz != null) {
- for (Field field : clazz.getDeclaredFields()) {
- AvroName avroName = field.getAnnotation(AvroName.class);
- if (avroName != null && name.equals(avroName.value())) {
- return field;
- } else if (avroName == null && name.equals(field.getName())) {
- return field;
- }
- }
- clazz = clazz.getSuperclass();
- }
-
- throw new IllegalArgumentException("Unable to get field " + name + " from " + originalClazz);
- }
- }
-
- @Override
- public boolean equals(@Nullable Object other) {
- if (other == this) {
- return true;
- }
- if (!(other instanceof AvroCoder)) {
- return false;
- }
- AvroCoder> that = (AvroCoder>) other;
- return Objects.equals(this.schemaSupplier.get(), that.schemaSupplier.get())
- && Objects.equals(this.typeDescriptor, that.typeDescriptor)
- && this.useReflectApi == that.useReflectApi;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(schemaSupplier.get(), typeDescriptor, useReflectApi);
- }
-
- /**
- * Conversion for DateTime.
- *
- * This is a copy from Avro 1.8's TimestampConversion, which is renamed in Avro 1.9. Defining
- * own copy gives flexibility for Beam Java SDK to work with Avro 1.8 and 1.9 at runtime.
- *
- * @see BEAM-9144: Beam's own Avro
- * TimeConversion class in beam-sdk-java-core
- */
- public static class JodaTimestampConversion extends Conversion {
- @Override
- public Class getConvertedType() {
- return DateTime.class;
- }
-
- @Override
- public String getLogicalTypeName() {
- return "timestamp-millis";
- }
-
- @Override
- public DateTime fromLong(Long millisFromEpoch, Schema schema, LogicalType type) {
- return new DateTime(millisFromEpoch, DateTimeZone.UTC);
- }
-
- @Override
- public Long toLong(DateTime timestamp, Schema schema, LogicalType type) {
- return timestamp.getMillis();
- }
- }
-}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java
deleted file mode 100644
index 7d90206ce4c5a..0000000000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/AvroGenericCoder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-
-/**
- * AvroCoder specialisation for GenericRecord.
- *
- * @deprecated Avro related classes are deprecated in module beam-sdks-java-core
and
- * will be eventually removed. Please, migrate to a new module
- * beam-sdks-java-extensions-avro
by importing
- * org.apache.beam.sdk.extensions.avro.coders.AvroGenericCoder
instead of this one.
- */
-@Deprecated
-public class AvroGenericCoder extends AvroCoder {
- AvroGenericCoder(Schema schema) {
- super(GenericRecord.class, schema);
- }
-
- public static AvroGenericCoder of(Schema schema) {
- return new AvroGenericCoder(schema);
- }
-}
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
deleted file mode 100644
index 5660921fa4022..0000000000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ /dev/null
@@ -1,2031 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io;
-
-import static org.apache.beam.sdk.io.FileIO.ReadMatches.DirectoryTreatment;
-import static org.apache.beam.sdk.io.ReadAllViaFileBasedSource.ReadFileRangesFnExceptionHandler;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
-import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.auto.value.AutoValue;
-import java.io.IOException;
-import java.io.Serializable;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
-import java.util.Map;
-import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
-import org.apache.avro.file.DataFileConstants;
-import org.apache.avro.file.DataFileWriter;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.reflect.ReflectData;
-import org.apache.avro.reflect.ReflectDatumWriter;
-import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileIO.MatchConfiguration;
-import org.apache.beam.sdk.io.FileIO.ReadableFile;
-import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
-import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.ValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
-import org.apache.beam.sdk.schemas.utils.AvroUtils;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.SerializableFunction;
-import org.apache.beam.sdk.transforms.SerializableFunctions;
-import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.TypeDescriptors;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
-import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.Duration;
-
-/**
- * {@link PTransform}s for reading and writing Avro files.
- *
- * Reading Avro files
- *
- * To read a {@link PCollection} from one or more Avro files with the same schema known at
- * pipeline construction time, use {@link #read}, using {@link AvroIO.Read#from} to specify the
- * filename or filepattern to read from. If the filepatterns to be read are themselves in a {@link
- * PCollection} you can use {@link FileIO} to match them and {@link AvroIO#readFiles} to read them.
- * If the schema is unknown at pipeline construction time, use {@link #parseGenericRecords} or
- * {@link #parseFilesGenericRecords}.
- *
- *
Many configuration options below apply to several or all of these transforms.
- *
- *
See {@link FileSystems} for information on supported file systems and filepatterns.
- *
- *
Filepattern expansion and watching
- *
- * By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} or the
- * combination of {@link FileIO.Match#continuously(Duration, TerminationCondition)} and {@link
- * AvroIO#readFiles(Class)} allow streaming of new files matching the filepattern(s).
- *
- *
By default, {@link #read} prohibits filepatterns that match no files, and {@link
- * AvroIO#readFiles(Class)} allows them in case the filepattern contains a glob wildcard character.
- * Use {@link Read#withEmptyMatchTreatment} or {@link
- * FileIO.Match#withEmptyMatchTreatment(EmptyMatchTreatment)} plus {@link AvroIO#readFiles(Class)}
- * to configure this behavior.
- *
- *
Reading records of a known schema
- *
- * To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
- * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
- * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
- * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
- * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link FileIO} matching
- * plus {@link #readFilesGenericRecords}.
- *
- *
For example:
- *
- *
{@code
- * Pipeline p = ...;
- *
- * // Read Avro-generated classes from files on GCS
- * PCollection records =
- * p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
- *
- * // Read GenericRecord's of the given schema from files on GCS
- * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
- * PCollection records =
- * p.apply(AvroIO.readGenericRecords(schema)
- * .from("gs://my_bucket/path/to/records-*.avro"));
- * }
- *
- * Reading records of an unknown schema
- *
- * To read records from files whose schema is unknown at pipeline construction time or differs
- * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
- * parsing function for converting each {@link GenericRecord} into a value of your custom type.
- * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link FileIO}
- * matching plus {@link #parseFilesGenericRecords(SerializableFunction)}.
- *
- *
For example:
- *
- *
{@code
- * Pipeline p = ...;
- *
- * PCollection records =
- * p.apply(AvroIO.parseGenericRecords(new SerializableFunction() {
- * public Foo apply(GenericRecord record) {
- * // If needed, access the schema of the record using record.getSchema()
- * return ...;
- * }
- * }));
- * }
- *
- * Reading from a {@link PCollection} of filepatterns
- *
- * {@code
- * Pipeline p = ...;
- *
- * PCollection filepatterns = p.apply(...);
- * PCollection records =
- * filepatterns.apply(AvroIO.readAll(AvroAutoGenClass.class));
- * PCollection records =
- * filepatterns
- * .apply(FileIO.matchAll())
- * .apply(FileIO.readMatches())
- * .apply(AvroIO.readFiles(AvroAutoGenClass.class));
- * PCollection genericRecords =
- * filepatterns.apply(AvroIO.readGenericRecords(schema));
- * PCollection records =
- * filepatterns
- * .apply(FileIO.matchAll())
- * .apply(FileIO.readMatches())
- * .apply(AvroIO.parseFilesGenericRecords(new SerializableFunction...);
- * }
- *
- * Streaming new files matching a filepattern
- *
- * {@code
- * Pipeline p = ...;
- *
- * PCollection lines = p.apply(AvroIO
- * .read(AvroAutoGenClass.class)
- * .from("gs://my_bucket/path/to/records-*.avro")
- * .watchForNewFiles(
- * // Check for new files every minute
- * Duration.standardMinutes(1),
- * // Stop watching the filepattern if no new files appear within an hour
- * afterTimeSinceNewOutput(Duration.standardHours(1))));
- * }
- *
- * Reading a very large number of files
- *
- * If it is known that the filepattern will match a very large number of files (e.g. tens of
- * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
- * scalability. Note that it may decrease performance if the filepattern matches only a small number
- * of files.
- *
- *
Inferring Beam schemas from Avro files
- *
- * If you want to use SQL or schema based operations on an Avro-based PCollection, you must
- * configure the read transform to infer the Beam schema and automatically setup the Beam related
- * coders by doing:
- *
- *
{@code
- * PCollection records =
- * p.apply(AvroIO.read(...).from(...).withBeamSchemas(true));
- * }
- *
- * Inferring Beam schemas from Avro PCollections
- *
- * If you created an Avro-based PCollection by other means e.g. reading records from Kafka or as
- * the output of another PTransform, you may be interested on making your PCollection schema-aware
- * so you can use the Schema-based APIs or Beam's SqlTransform.
- *
- *
If you are using Avro specific records (generated classes from an Avro schema), you can
- * register a schema provider for the specific Avro class to make any PCollection of these objects
- * schema-aware.
- *
- *
{@code
- * pipeline.getSchemaRegistry().registerSchemaProvider(AvroAutoGenClass.class, AvroAutoGenClass.getClassSchema());
- * }
- *
- * You can also manually set an Avro-backed Schema coder for a PCollection using {@link
- * org.apache.beam.sdk.schemas.utils.AvroUtils#schemaCoder(Class, Schema)} to make it schema-aware.
- *
- * {@code
- * PCollection records = ...
- * AvroCoder coder = (AvroCoder) users.getCoder();
- * records.setCoder(AvroUtils.schemaCoder(coder.getType(), coder.getSchema()));
- * }
- *
- * If you are using GenericRecords you may need to set a specific Beam schema coder for each
- * PCollection to match their internal Avro schema.
- *
- *
{@code
- * org.apache.avro.Schema avroSchema = ...
- * PCollection records = ...
- * records.setCoder(AvroUtils.schemaCoder(avroSchema));
- * }
- *
- * Writing Avro files
- *
- * To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
- * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
- * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
- * via {@link Write#withShardNameTemplate(String)}) and optional filename suffix (set via {@link
- * Write#withSuffix(String)}, to generate output filenames in a sharded way. You can override this
- * default write filename policy using {@link Write#to(FileBasedSink.FilenamePolicy)} to specify a
- * custom file naming policy.
- *
- *
By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
- * org.apache.avro.file.Codec CodecFactory.snappyCodec()}. This default can be changed or overridden
- * using {@link AvroIO.Write#withCodec}.
- *
- *
Writing specific or generic records
- *
- * To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write
- * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes
- * a {@link Schema} object, or {@link #writeGenericRecords(String)} which takes a schema in a
- * JSON-encoded string form. An exception will be thrown if a record doesn't match the specified
- * schema.
- *
- *
For example:
- *
- *
{@code
- * // A simple Write to a local file (only runs locally):
- * PCollection records = ...;
- * records.apply(AvroIO.write(AvroAutoGenClass.class).to("/path/to/file.avro"));
- *
- * // A Write to a sharded GCS file (runs locally and using remote execution):
- * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
- * PCollection records = ...;
- * records.apply("WriteToAvro", AvroIO.writeGenericRecords(schema)
- * .to("gs://my_bucket/path/to/numbers")
- * .withSuffix(".avro"));
- * }
- *
- * Writing windowed or unbounded data
- *
- * By default, all input is put into the global window before writing. If per-window writes are
- * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()}
- * will cause windowing and triggering to be preserved. When producing windowed writes with a
- * streaming runner that supports triggers, the number of output shards must be set explicitly using
- * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
- * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set,
- * and unique windows and triggers must produce unique filenames.
- *
- *
Writing data to multiple destinations
- *
- * The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
- * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
- * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
- * as an integer field. We want events for each user to go into a specific directory for that user,
- * and each user's data should be written with a specific schema for that user; a side input is
- * used, so the schema can be calculated in a different stage.
- *
- *
{@code
- * // This is the user class that controls dynamic destinations for this avro write. The input to
- * // AvroIO.Write will be UserEvent, and we will be writing GenericRecords to the file (in order
- * // to have dynamic schemas). Everything is per userid, so we define a dynamic destination type
- * // of Integer.
- * class UserDynamicAvroDestinations
- * extends DynamicAvroDestinations {
- * private final PCollectionView
- *
- * @deprecated Avro related classes are deprecated in module beam-sdks-java-core
and
- * will be eventually removed. Please, migrate to a new module
- * beam-sdks-java-extensions-avro
by importing
- * org.apache.beam.sdk.extensions.avro.io.AvroIO
instead of this one.
- */
-@SuppressWarnings({
- "nullness" // TODO(https://github.com/apache/beam/issues/20497)
-})
-@Deprecated
-public class AvroIO {
- /**
- * Reads records of the given type from an Avro file (or multiple Avro files matching a pattern).
- *
- * The schema must be specified using one of the {@code withSchema} functions.
- */
- public static Read read(Class recordClass) {
- return new AutoValue_AvroIO_Read.Builder()
- .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
- .setRecordClass(recordClass)
- .setSchema(ReflectData.get().getSchema(recordClass))
- .setInferBeamSchema(false)
- .setHintMatchesManyFiles(false)
- .build();
- }
-
- /**
- * Like {@link #read}, but reads each file in a {@link PCollection} of {@link ReadableFile},
- * returned by {@link FileIO#readMatches}.
- *
- * You can read {@link GenericRecord} by using {@code #readFiles(GenericRecord.class)} or
- * {@code #readFiles(new Schema.Parser().parse(schema))} if the schema is a String.
- */
- public static ReadFiles readFiles(Class recordClass) {
- return new AutoValue_AvroIO_ReadFiles.Builder()
- .setRecordClass(recordClass)
- .setSchema(ReflectData.get().getSchema(recordClass))
- .setInferBeamSchema(false)
- .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
- .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
- .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
- .build();
- }
-
- /**
- * Like {@link #read}, but reads each filepattern in the input {@link PCollection}.
- *
- * @deprecated You can achieve The functionality of {@link #readAll} using {@link FileIO} matching
- * plus {@link #readFiles(Class)}. This is the preferred method to make composition explicit.
- * {@link ReadAll} will not receive upgrades and will be removed in a future version of Beam.
- */
- @Deprecated
- public static ReadAll readAll(Class recordClass) {
- return new AutoValue_AvroIO_ReadAll.Builder()
- .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
- .setRecordClass(recordClass)
- .setSchema(ReflectData.get().getSchema(recordClass))
- .setInferBeamSchema(false)
- .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
- .build();
- }
-
- /** Reads Avro file(s) containing records of the specified schema. */
- public static Read readGenericRecords(Schema schema) {
- return new AutoValue_AvroIO_Read.Builder()
- .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
- .setRecordClass(GenericRecord.class)
- .setSchema(schema)
- .setInferBeamSchema(false)
- .setHintMatchesManyFiles(false)
- .build();
- }
-
- /**
- * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
- * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
- */
- public static ReadFiles readFilesGenericRecords(Schema schema) {
- return new AutoValue_AvroIO_ReadFiles.Builder()
- .setRecordClass(GenericRecord.class)
- .setSchema(schema)
- .setInferBeamSchema(false)
- .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
- .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
- .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
- .build();
- }
-
- /**
- * Like {@link #readGenericRecords(Schema)}, but for a {@link PCollection} of {@link
- * ReadableFile}, for example, returned by {@link FileIO#readMatches}.
- *
- * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(Schema)} using
- * {@link FileIO} matching plus {@link #readFilesGenericRecords(Schema)}. This is the
- * preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
- * and will be removed in a future version of Beam.
- */
- @Deprecated
- public static ReadAll readAllGenericRecords(Schema schema) {
- return new AutoValue_AvroIO_ReadAll.Builder()
- .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
- .setRecordClass(GenericRecord.class)
- .setSchema(schema)
- .setInferBeamSchema(false)
- .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
- .build();
- }
-
- /**
- * Reads Avro file(s) containing records of the specified schema. The schema is specified as a
- * JSON-encoded string.
- */
- public static Read readGenericRecords(String schema) {
- return readGenericRecords(new Schema.Parser().parse(schema));
- }
-
- /** Like {@link #readGenericRecords(String)}, but for {@link ReadableFile} collections. */
- public static ReadFiles readFilesGenericRecords(String schema) {
- return readFilesGenericRecords(new Schema.Parser().parse(schema));
- }
-
- /**
- * Like {@link #readGenericRecords(String)}, but reads each filepattern in the input {@link
- * PCollection}.
- *
- * @deprecated You can achieve The functionality of {@link #readAllGenericRecords(String)} using
- * {@link FileIO} matching plus {@link #readFilesGenericRecords(String)}. This is the
- * preferred method to make composition explicit. {@link ReadAll} will not receive upgrades
- * and will be removed in a future version of Beam.
- */
- @Deprecated
- public static ReadAll readAllGenericRecords(String schema) {
- return readAllGenericRecords(new Schema.Parser().parse(schema));
- }
-
- /**
- * Reads Avro file(s) containing records of an unspecified schema and converting each record to a
- * custom type.
- */
- public static Parse parseGenericRecords(SerializableFunction parseFn) {
- return new AutoValue_AvroIO_Parse.Builder()
- .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.DISALLOW))
- .setParseFn(parseFn)
- .setHintMatchesManyFiles(false)
- .build();
- }
-
- /**
- * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each {@link ReadableFile} in
- * the input {@link PCollection}.
- */
- public static ParseFiles parseFilesGenericRecords(
- SerializableFunction parseFn) {
- return new AutoValue_AvroIO_ParseFiles.Builder()
- .setParseFn(parseFn)
- .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
- .setUsesReshuffle(ReadAllViaFileBasedSource.DEFAULT_USES_RESHUFFLE)
- .setFileExceptionHandler(new ReadFileRangesFnExceptionHandler())
- .build();
- }
-
- /**
- * Like {@link #parseGenericRecords(SerializableFunction)}, but reads each filepattern in the
- * input {@link PCollection}.
- *
- * @deprecated You can achieve The functionality of {@link
- * #parseAllGenericRecords(SerializableFunction)} using {@link FileIO} matching plus {@link
- * #parseFilesGenericRecords(SerializableFunction)} ()}. This is the preferred method to make
- * composition explicit. {@link ParseAll} will not receive upgrades and will be removed in a
- * future version of Beam.
- */
- @Deprecated
- public static ParseAll parseAllGenericRecords(
- SerializableFunction parseFn) {
- return new AutoValue_AvroIO_ParseAll.Builder()
- .setMatchConfiguration(MatchConfiguration.create(EmptyMatchTreatment.ALLOW_IF_WILDCARD))
- .setParseFn(parseFn)
- .setDesiredBundleSizeBytes(DEFAULT_BUNDLE_SIZE_BYTES)
- .build();
- }
-
- /**
- * Writes a {@link PCollection} to an Avro file (or multiple Avro files matching a sharding
- * pattern).
- */
- public static Write write(Class recordClass) {
- return new Write<>(
- AvroIO.defaultWriteBuilder()
- .setGenericRecords(false)
- .setSchema(ReflectData.get().getSchema(recordClass))
- .build());
- }
-
- /** Writes Avro records of the specified schema. */
- public static Write writeGenericRecords(Schema schema) {
- return new Write<>(
- AvroIO.defaultWriteBuilder()
- .setGenericRecords(true)
- .setSchema(schema)
- .build());
- }
-
- /**
- * A {@link PTransform} that writes a {@link PCollection} to an avro file (or multiple avro files
- * matching a sharding pattern), with each element of the input collection encoded into its own
- * record of type OutputT.
- *
- * This version allows you to apply {@link AvroIO} writes to a PCollection of a custom type
- * {@link UserT}. A format mechanism that converts the input type {@link UserT} to the output type
- * that will be written to the file must be specified. If using a custom {@link
- * DynamicAvroDestinations} object this is done using {@link
- * DynamicAvroDestinations#formatRecord}, otherwise the {@link
- * AvroIO.TypedWrite#withFormatFunction} can be used to specify a format function.
- *
- *
The advantage of using a custom type is that is it allows a user-provided {@link
- * DynamicAvroDestinations} object, set via {@link AvroIO.Write#to(DynamicAvroDestinations)} to
- * examine the custom type when choosing a destination.
- *
- *
If the output type is {@link GenericRecord} use {@link #writeCustomTypeToGenericRecords()}
- * instead.
- */
- public static TypedWrite writeCustomType() {
- return AvroIO.defaultWriteBuilder().setGenericRecords(false).build();
- }
-
- /**
- * Similar to {@link #writeCustomType()}, but specialized for the case where the output type is
- * {@link GenericRecord}. A schema must be specified either in {@link
- * DynamicAvroDestinations#getSchema} or if not using dynamic destinations, by using {@link
- * TypedWrite#withSchema(Schema)}.
- */
- public static TypedWrite writeCustomTypeToGenericRecords() {
- return AvroIO.defaultWriteBuilder().setGenericRecords(true).build();
- }
-
- /**
- * Writes Avro records of the specified schema. The schema is specified as a JSON-encoded string.
- */
- public static Write writeGenericRecords(String schema) {
- return writeGenericRecords(new Schema.Parser().parse(schema));
- }
-
- private static TypedWrite.Builder defaultWriteBuilder() {
- return new AutoValue_AvroIO_TypedWrite.Builder()
- .setFilenameSuffix(null)
- .setShardTemplate(null)
- .setNumShards(0)
- .setCodec(TypedWrite.DEFAULT_SERIALIZABLE_CODEC)
- .setMetadata(ImmutableMap.of())
- .setWindowedWrites(false)
- .setNoSpilling(false)
- .setSyncInterval(DataFileConstants.DEFAULT_SYNC_INTERVAL);
- }
-
- private static PCollection setBeamSchema(
- PCollection pc, Class clazz, @Nullable Schema schema) {
- return pc.setCoder(AvroUtils.schemaCoder(clazz, schema));
- }
-
- /**
- * 64MB is a reasonable value that allows to amortize the cost of opening files, but is not so
- * large as to exhaust a typical runner's maximum amount of output per ProcessElement call.
- */
- private static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024L;
-
- /** Implementation of {@link #read} and {@link #readGenericRecords}. */
- @AutoValue
- public abstract static class Read extends PTransform> {
-
- abstract @Nullable ValueProvider getFilepattern();
-
- abstract MatchConfiguration getMatchConfiguration();
-
- abstract @Nullable Class getRecordClass();
-
- abstract @Nullable Schema getSchema();
-
- abstract boolean getInferBeamSchema();
-
- abstract boolean getHintMatchesManyFiles();
-
- abstract Builder toBuilder();
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setFilepattern(ValueProvider filepattern);
-
- abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration);
-
- abstract Builder setRecordClass(Class recordClass);
-
- abstract Builder setSchema(Schema schema);
-
- abstract Builder setInferBeamSchema(boolean infer);
-
- abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
-
- abstract Read build();
- }
-
- /**
- * Reads from the given filename or filepattern.
- *
- * If it is known that the filepattern will match a very large number of files (at least tens
- * of thousands), use {@link #withHintMatchesManyFiles} for better performance and scalability.
- */
- public Read from(ValueProvider filepattern) {
- return toBuilder().setFilepattern(filepattern).build();
- }
-
- /** Like {@link #from(ValueProvider)}. */
- public Read from(String filepattern) {
- return from(StaticValueProvider.of(filepattern));
- }
-
- /** Sets the {@link MatchConfiguration}. */
- public Read withMatchConfiguration(MatchConfiguration matchConfiguration) {
- return toBuilder().setMatchConfiguration(matchConfiguration).build();
- }
-
- /** Configures whether or not a filepattern matching no files is allowed. */
- public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
- return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
- }
-
- /**
- * Continuously watches for new files matching the filepattern, polling it at the given
- * interval, until the given termination condition is reached. The returned {@link PCollection}
- * is unbounded. If {@code matchUpdatedFiles} is set, also watches for files with timestamp
- * change.
- *
- * This works only in runners supporting splittable {@link
- * org.apache.beam.sdk.transforms.DoFn}.
- */
- public Read watchForNewFiles(
- Duration pollInterval,
- TerminationCondition terminationCondition,
- boolean matchUpdatedFiles) {
- return withMatchConfiguration(
- getMatchConfiguration()
- .continuously(pollInterval, terminationCondition, matchUpdatedFiles));
- }
-
- /**
- * Same as {@link Read#watchForNewFiles(Duration, TerminationCondition, boolean)} with {@code
- * matchUpdatedFiles=false}.
- */
- public Read watchForNewFiles(
- Duration pollInterval, TerminationCondition terminationCondition) {
- return watchForNewFiles(pollInterval, terminationCondition, false);
- }
-
- /**
- * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
- * files.
- *
- * This hint may cause a runner to execute the transform differently, in a way that improves
- * performance for this case, but it may worsen performance if the filepattern matches only a
- * small number of files (e.g., in a runner that supports dynamic work rebalancing, it will
- * happen less efficiently within individual files).
- */
- public Read withHintMatchesManyFiles() {
- return toBuilder().setHintMatchesManyFiles(true).build();
- }
-
- /**
- * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
- * to be used by SQL and by the schema-transform library.
- */
- public Read withBeamSchemas(boolean withBeamSchemas) {
- return toBuilder().setInferBeamSchema(withBeamSchemas).build();
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public PCollection expand(PBegin input) {
- checkNotNull(getFilepattern(), "filepattern");
- checkNotNull(getSchema(), "schema");
-
- if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
- PCollection read =
- input.apply(
- "Read",
- org.apache.beam.sdk.io.Read.from(
- createSource(
- getFilepattern(),
- getMatchConfiguration().getEmptyMatchTreatment(),
- getRecordClass(),
- getSchema(),
- null)));
- return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
- }
-
- // All other cases go through FileIO + ReadFiles
- ReadFiles readFiles =
- (getRecordClass() == GenericRecord.class)
- ? (ReadFiles) readFilesGenericRecords(getSchema())
- : readFiles(getRecordClass());
- return input
- .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
- .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
- .apply(
- "Read Matches",
- FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
- .apply("Via ReadFiles", readFiles);
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .add(
- DisplayData.item("inferBeamSchema", getInferBeamSchema())
- .withLabel("Infer Beam Schema"))
- .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
- .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"))
- .addIfNotNull(
- DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
- .include("matchConfiguration", getMatchConfiguration());
- }
-
- @SuppressWarnings("unchecked")
- private static AvroSource createSource(
- ValueProvider filepattern,
- EmptyMatchTreatment emptyMatchTreatment,
- Class recordClass,
- Schema schema,
- AvroSource.@Nullable DatumReaderFactory readerFactory) {
- AvroSource> source =
- AvroSource.from(filepattern).withEmptyMatchTreatment(emptyMatchTreatment);
-
- if (readerFactory != null) {
- source = source.withDatumReaderFactory(readerFactory);
- }
- return recordClass == GenericRecord.class
- ? (AvroSource) source.withSchema(schema)
- : source.withSchema(recordClass);
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /** Implementation of {@link #readFiles}. */
- @AutoValue
- public abstract static class ReadFiles
- extends PTransform, PCollection> {
-
- abstract @Nullable Class getRecordClass();
-
- abstract @Nullable Schema getSchema();
-
- abstract boolean getUsesReshuffle();
-
- abstract ReadFileRangesFnExceptionHandler getFileExceptionHandler();
-
- abstract long getDesiredBundleSizeBytes();
-
- abstract boolean getInferBeamSchema();
-
- abstract AvroSource.@Nullable DatumReaderFactory getDatumReaderFactory();
-
- abstract Builder toBuilder();
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setRecordClass(Class recordClass);
-
- abstract Builder setSchema(Schema schema);
-
- abstract Builder setUsesReshuffle(boolean usesReshuffle);
-
- abstract Builder setFileExceptionHandler(
- ReadFileRangesFnExceptionHandler exceptionHandler);
-
- abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
-
- abstract Builder setInferBeamSchema(boolean infer);
-
- abstract Builder setDatumReaderFactory(AvroSource.DatumReaderFactory factory);
-
- abstract ReadFiles build();
- }
-
- /**
- * Set a value for the bundle size for parallel reads. Default is 64 MB. You may want to use a
- * lower value (e.g. 1 MB) for streaming applications.
- */
- public ReadFiles withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
- return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
- }
-
- /** Specifies if a Reshuffle should run before file reads occur. */
- public ReadFiles withUsesReshuffle(boolean usesReshuffle) {
- return toBuilder().setUsesReshuffle(usesReshuffle).build();
- }
-
- /** Specifies if exceptions should be logged only for streaming pipelines. */
- public ReadFiles withFileExceptionHandler(
- ReadFileRangesFnExceptionHandler exceptionHandler) {
- return toBuilder().setFileExceptionHandler(exceptionHandler).build();
- }
-
- /**
- * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
- * to be used by SQL and by the schema-transform library.
- */
- public ReadFiles withBeamSchemas(boolean withBeamSchemas) {
- return toBuilder().setInferBeamSchema(withBeamSchemas).build();
- }
-
- public ReadFiles withDatumReaderFactory(AvroSource.DatumReaderFactory factory) {
- return toBuilder().setDatumReaderFactory(factory).build();
- }
-
- @Override
- public PCollection expand(PCollection input) {
- checkNotNull(getSchema(), "schema");
- PCollection read =
- input.apply(
- "Read all via FileBasedSource",
- new ReadAllViaFileBasedSource<>(
- getDesiredBundleSizeBytes(),
- new CreateSourceFn<>(
- getRecordClass(), getSchema().toString(), getDatumReaderFactory()),
- AvroCoder.of(getRecordClass(), getSchema()),
- getUsesReshuffle(),
- getFileExceptionHandler()));
- return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .add(
- DisplayData.item("inferBeamSchema", getInferBeamSchema())
- .withLabel("Infer Beam Schema"))
- .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
- .addIfNotNull(
- DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"));
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Implementation of {@link #readAll}.
- *
- * @deprecated See {@link #readAll(Class)} for details.
- */
- @Deprecated
- @AutoValue
- public abstract static class ReadAll extends PTransform, PCollection> {
- abstract MatchConfiguration getMatchConfiguration();
-
- abstract @Nullable Class getRecordClass();
-
- abstract @Nullable Schema getSchema();
-
- abstract long getDesiredBundleSizeBytes();
-
- abstract boolean getInferBeamSchema();
-
- abstract Builder toBuilder();
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration);
-
- abstract Builder setRecordClass(Class recordClass);
-
- abstract Builder setSchema(Schema schema);
-
- abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
-
- abstract Builder setInferBeamSchema(boolean infer);
-
- abstract ReadAll build();
- }
-
- /** Sets the {@link MatchConfiguration}. */
- public ReadAll withMatchConfiguration(MatchConfiguration configuration) {
- return toBuilder().setMatchConfiguration(configuration).build();
- }
-
- /** Like {@link Read#withEmptyMatchTreatment}. */
- public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
- return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
- }
-
- /** Like {@link Read#watchForNewFiles}. */
- public ReadAll watchForNewFiles(
- Duration pollInterval, TerminationCondition terminationCondition) {
- return withMatchConfiguration(
- getMatchConfiguration().continuously(pollInterval, terminationCondition));
- }
-
- /**
- * Set a value for the bundle size for parallel reads. Default is 64 MB. You may want to use a
- * lower value (e.g. 1 MB) for streaming applications.
- */
- public ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
- return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
- }
-
- /**
- * If set to true, a Beam schema will be inferred from the AVRO schema. This allows the output
- * to be used by SQL and by the schema-transform library.
- */
- public ReadAll withBeamSchemas(boolean withBeamSchemas) {
- return toBuilder().setInferBeamSchema(withBeamSchemas).build();
- }
-
- @Override
- public PCollection expand(PCollection input) {
- checkNotNull(getSchema(), "schema");
- PCollection read =
- input
- .apply(FileIO.matchAll().withConfiguration(getMatchConfiguration()))
- .apply(FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
- .apply(readFiles(getRecordClass()));
- return getInferBeamSchema() ? setBeamSchema(read, getRecordClass(), getSchema()) : read;
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .add(
- DisplayData.item("inferBeamSchema", getInferBeamSchema())
- .withLabel("Infer Beam Schema"))
- .addIfNotNull(DisplayData.item("schema", String.valueOf(getSchema())))
- .addIfNotNull(DisplayData.item("recordClass", getRecordClass()).withLabel("Record Class"))
- .include("matchConfiguration", getMatchConfiguration());
- }
- }
-
- private static class CreateSourceFn
- implements SerializableFunction> {
- private final Class recordClass;
- private final Supplier schemaSupplier;
- private final AvroSource.DatumReaderFactory readerFactory;
-
- CreateSourceFn(
- Class recordClass, String jsonSchema, AvroSource.DatumReaderFactory readerFactory) {
- this.recordClass = recordClass;
- this.schemaSupplier =
- Suppliers.memoize(
- Suppliers.compose(new JsonToSchema(), Suppliers.ofInstance(jsonSchema)));
- this.readerFactory = readerFactory;
- }
-
- @Override
- public FileBasedSource apply(String input) {
- return Read.createSource(
- StaticValueProvider.of(input),
- EmptyMatchTreatment.DISALLOW,
- recordClass,
- schemaSupplier.get(),
- readerFactory);
- }
-
- private static class JsonToSchema implements Function, Serializable {
- @Override
- public Schema apply(String input) {
- return new Schema.Parser().parse(input);
- }
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /** Implementation of {@link #parseGenericRecords}. */
- @AutoValue
- public abstract static class Parse extends PTransform> {
-
- abstract @Nullable ValueProvider getFilepattern();
-
- abstract MatchConfiguration getMatchConfiguration();
-
- abstract SerializableFunction getParseFn();
-
- abstract @Nullable Coder getCoder();
-
- abstract boolean getHintMatchesManyFiles();
-
- abstract Builder toBuilder();
-
- @AutoValue.Builder
- abstract static class Builder {
- abstract Builder setFilepattern(ValueProvider filepattern);
-
- abstract Builder setMatchConfiguration(MatchConfiguration matchConfiguration);
-
- abstract Builder setParseFn(SerializableFunction parseFn);
-
- abstract Builder setCoder(Coder coder);
-
- abstract Builder setHintMatchesManyFiles(boolean hintMatchesManyFiles);
-
- abstract Parse build();
- }
-
- /** Reads from the given filename or filepattern. */
- public Parse from(String filepattern) {
- return from(StaticValueProvider.of(filepattern));
- }
-
- /** Like {@link #from(String)}. */
- public Parse from(ValueProvider filepattern) {
- return toBuilder().setFilepattern(filepattern).build();
- }
-
- /** Sets the {@link MatchConfiguration}. */
- public Parse withMatchConfiguration(MatchConfiguration configuration) {
- return toBuilder().setMatchConfiguration(configuration).build();
- }
-
- /** Like {@link Read#withEmptyMatchTreatment}. */
- public Parse withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
- return withMatchConfiguration(getMatchConfiguration().withEmptyMatchTreatment(treatment));
- }
-
- /** Like {@link Read#watchForNewFiles}. */
- public Parse watchForNewFiles(
- Duration pollInterval, TerminationCondition terminationCondition) {
- return withMatchConfiguration(
- getMatchConfiguration().continuously(pollInterval, terminationCondition));
- }
-
- /** Sets a coder for the result of the parse function. */
- public Parse withCoder(Coder coder) {
- return toBuilder().setCoder(coder).build();
- }
-
- /** Like {@link Read#withHintMatchesManyFiles()}. */
- public Parse withHintMatchesManyFiles() {
- return toBuilder().setHintMatchesManyFiles(true).build();
- }
-
- @Override
- public PCollection expand(PBegin input) {
- checkNotNull(getFilepattern(), "filepattern");
- Coder coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
-
- if (getMatchConfiguration().getWatchInterval() == null && !getHintMatchesManyFiles()) {
- return input.apply(
- org.apache.beam.sdk.io.Read.from(
- AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
- }
-
- // All other cases go through FileIO + ParseFilesGenericRecords.
- return input
- .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
- .apply("Match All", FileIO.matchAll().withConfiguration(getMatchConfiguration()))
- .apply(
- "Read Matches",
- FileIO.readMatches().withDirectoryTreatment(DirectoryTreatment.PROHIBIT))
- .apply("Via ParseFiles", parseFilesGenericRecords(getParseFn()).withCoder(coder));
- }
-
- private static Coder inferCoder(
- @Nullable Coder explicitCoder,
- SerializableFunction parseFn,
- CoderRegistry coderRegistry) {
- if (explicitCoder != null) {
- return explicitCoder;
- }
- // If a coder was not specified explicitly, infer it from parse fn.
- try {
- return coderRegistry.getCoder(TypeDescriptors.outputOf(parseFn));
- } catch (CannotProvideCoderException e) {
- throw new IllegalArgumentException(
- "Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",
- e);
- }
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(
- DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
- .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
- .include("matchConfiguration", getMatchConfiguration());
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /** Implementation of {@link #parseFilesGenericRecords}. */
- @AutoValue
- public abstract static class ParseFiles
- extends PTransform