Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14036] Read Configuration for Pub/Sub SchemaTransform #17730

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;

/**
* Configuration for reading from Pub/Sub.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@Experimental
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class PubsubSchemaTransformReadConfiguration {

/** Instantiates a {@link PubsubSchemaTransformReadConfiguration.Builder}. */
public static Builder builder() {
return new AutoValue_PubsubSchemaTransformReadConfiguration.Builder();
}

/** The expected schema of the Pub/Sub message. */
public abstract Schema getDataSchema();

/**
* The Pub/Sub topic path to write failures.
*
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the dead
* letter queue topic string.
*/
@Nullable
public abstract String getDeadLetterQueue();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls change to getDeadLetterTopic to be consistent with the Pub/Sub read transform.

public Read<T> withDeadLetterTopic(String deadLetterTopic) {

Copy link
Contributor Author

@damondouglas damondouglas Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chamikaramj (cc: @angoenka ) Thank you again for reviewing. May we consider leaving the name getDeadLetterQueue?

In a previous use of AutoValueSchema with AutoValue in a different project, I observed it needed the getters to be named as get in order for the serialization to work. For example, I had a property called fooName. When I named the getter fooName(), the return value of fooName() was null when invoked in the context of a DoFn. However, when I changed the getter to getFooName() the return value was what I expected. I am not sure if my observation is still valid.

Adding to supporting get instead of with, the design goals of the configuration class are to hold data needed by its corresponding SchemaProvider. The method is not doing any action implied by the with preposition. The getter is simply getting data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I think the confusion here is that this property does not correspond to that PubSubIO.getDeadLetterTopic I referenced by maps to the dlq property below.

Is that correct ?


/**
* The expected format of the Pub/Sub message.
*
* <p>Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer} from
* {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}.
*/
@Nullable
public abstract String getFormat();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see format, protoClass, thriftClass attributes in the original Read config.

Copy link
Contributor Author

@damondouglas damondouglas Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These again map to PubSubSchemaIO not the original PubSubIO. Is that plan here to make Pub/Sub SchemaTransform just consistent with existing Pub/Sub SchemaIO ? I'm not sure.

@TheNeuralBit @dpcollins-google WDYT ? Should we just be consistent with Pub/Sub SchemaIO implementation here or should SchemaTransform be a different design ?

Copy link
Contributor Author

@damondouglas damondouglas Jun 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These again map to PubSubSchemaIO not the original PubSubIO. Is that plan here to make Pub/Sub SchemaTransform just consistent with existing Pub/Sub SchemaIO ? I'm not sure.

@TheNeuralBit @dpcollins-google WDYT ? Should we just be consistent with Pub/Sub SchemaIO implementation here or should SchemaTransform be a different design ?

The plan and replicate like-for-like SchemaIO questions are critical and blocking design decisions that relates to this thread as well. I will hold off on any changes to this PR until we get the feedback needed. Thank you again.


/** Used by the ProtoPayloadSerializerProvider when serializing from a Pub/Sub message. */
@Nullable
public abstract String getProtoClass();

/**
* The subscription from which to read Pub/Sub messages.
*
* <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format of
* the subscription string.
*/
@Nullable
public abstract String getSubscription();

/** Used by the ThriftPayloadSerializerProvider when serializing from a Pub/Sub message. */
@Nullable
public abstract String getThriftClass();

/** Used by the ThriftPayloadSerializerProvider when serializing from a Pub/Sub message. */
@Nullable
public abstract String getThriftProtocolFactoryClass();

/**
* When reading from Cloud Pub/Sub where record timestamps are provided as Pub/Sub message
* attributes, specifies the name of the attribute that contains the timestamp.
*/
@Nullable
public abstract String getTimestampAttribute();

/**
* When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub message
* attributes, specifies the name of the attribute containing the unique identifier.
*/
@Nullable
public abstract String getIdAttribute();

/**
* The topic from which to read Pub/Sub messages.
*
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
* topic string.
*/
@Nullable
public abstract String getTopic();

@AutoValue.Builder
public abstract static class Builder {

/** The expected schema of the Pub/Sub message. */
public abstract Builder setDataSchema(Schema value);

/**
* The Pub/Sub topic path to write failures.
*
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
* dead letter queue topic string.
*/
public abstract Builder setDeadLetterQueue(String value);

/**
* The expected format of the Pub/Sub message.
*
* <p>Used to retrieve the {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializer}
* from {@link org.apache.beam.sdk.schemas.io.payloads.PayloadSerializers}.
*/
public abstract Builder setFormat(String value);

/** Used by the ProtoPayloadSerializerProvider when serializing from a Pub/Sub message. */
public abstract Builder setProtoClass(String value);

/**
* The subscription from which to read Pub/Sub messages.
*
* <p>See {@link PubsubIO.PubsubSubscription#fromPath(String)} for more details on the format of
* the subscription string.
*/
public abstract Builder setSubscription(String value);

/** Used by the ThriftPayloadSerializerProvider when serializing from a Pub/Sub message. */
public abstract Builder setThriftClass(String value);

/** Used by the ThriftPayloadSerializerProvider when serializing from a Pub/Sub message. */
public abstract Builder setThriftProtocolFactoryClass(String value);

/**
* When reading from Cloud Pub/Sub where record timestamps are provided as Pub/Sub message
* attributes, specifies the name of the attribute that contains the timestamp.
*/
public abstract Builder setTimestampAttribute(String value);

/**
* When reading from Cloud Pub/Sub where unique record identifiers are provided as Pub/Sub
* message attributes, specifies the name of the attribute containing the unique identifier.
*/
public abstract Builder setIdAttribute(String value);

/**
* The topic from which to read Pub/Sub messages.
*
* <p>See {@link PubsubIO.PubsubTopic#fromPath(String)} for more details on the format of the
* topic string.
*/
public abstract Builder setTopic(String value);

/** Builds a {@link PubsubSchemaTransformReadConfiguration} instance. */
public abstract PubsubSchemaTransformReadConfiguration build();
}
}