Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is it possible to do CDC stream UPSERT with java library? #2135

Closed
ismailsimsek opened this issue Jun 4, 2023 · 9 comments
Closed

Is it possible to do CDC stream UPSERT with java library? #2135

ismailsimsek opened this issue Jun 4, 2023 · 9 comments
Labels
api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@ismailsimsek
Copy link

is it possible to do CDC stream inserts? is this feature available in java library?

getting following error

    JSONArray jsonArr = new JSONArray();
    JSONObject record = new JSONObject().put("c_id", 2).put("c_string", "record-1").put("_CHANGE_TYPE", "UPSERT");
    jsonArr.put(record);
    AppendRowsResponse response = streamWriter.append(jsonArr).get();
Exception in thread "main" com.google.cloud.bigquery.storage.v1.Exceptions$AppendSerializationError: INVALID_ARGUMENT: Append serialization failed for writer: projects/myproject-dev/datasets/stage/tables/test_CDC_stream_data_loading/_default
	at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.append(SchemaAwareStreamWriter.java:207)
	at com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter.append(SchemaAwareStreamWriter.java:109)
	at com.google.cloud.bigquery.storage.v1.JsonStreamWriter.append(JsonStreamWriter.java:62)
	at experiments.TestCDCStreamLoading.main(TestCDCStreamLoading.java:72)
@product-auto-label product-auto-label bot added the api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. label Jun 4, 2023
@faisalhasnain
Copy link

I am getting same error

@faisalhasnain
Copy link

this CDC feature works in python but not in java, any ideas when it will get fixed?

@Neenu1995 Neenu1995 added the type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. label Jun 16, 2023
@ismailsimsek
Copy link
Author

ismailsimsek commented Jun 21, 2023

i believe the error is related to missing field _CHANGE_TYPE in the table schema.

it works as append mode(without failing), when unknown fields set to be ignored .setIgnoreUnknownFields(TRUE)

@Neenu1995
Copy link
Contributor

To use the upsert functionality, it needs to specify the table schema with the _change_type field. You can do this by changing the constructor you use inside the DataWriter.initialize method by using link.

In order to padding the _change_type into the current table schema, the code can be:

final String CHANGE_TYPE_PSEUDO_COLUMN = "_change_type";

    tableSchema.toBuilder()
        .addFields(
            TableFieldSchema.newBuilder()
                .setName(CHANGE_TYPE_PSEUDO_COLUMN)
                .setType(TableFieldSchema.Type.STRING)
                .setMode(Mode.NULLABLE)
                .build())
        .build();
  }

@faisalhasnain
Copy link

yeah that's what i did to get it working, thanks for sharing :)

@PhongChuong
Copy link
Contributor

We recently added CDC upsert sample which can be found here:
samples/snippets/src/main/java/com/example/bigquerystorage/JsonWriterStreamCdc.java

@augi
Copy link

augi commented Mar 19, 2024

@Neenu1995 @PhongChuong We are publishing a JSON to PubSub, and the subscription uses the Use Table Schema settings. We have the _CHANGE_TYPE field included in the JSON, but it is still unrecognized.

Does this mean that we should alter the BigQuery table to have the _change_type column?

EDIT: This is not possible.
image

@augi
Copy link

augi commented Mar 19, 2024

Just for the record, the issue was that the BigQuery table didn´t have PK specified. The component responsible for writing then probably doesn´t expect the _CHANGE_TYPE field.

@AndyCorlin
Copy link

Thanks for the tips about primary key and to add the _CHANGE_TYPE to the spec!

I found this code in com.google.cloud.bigquery.storage.v1.SchemaAwareStreamWriter how to fetch the schema from BigQuery and then used @Neenu1995 's code to add the pseudo column, before creating the JsonStreamWriter.

            String streamName = tableName + "/_default";
            GetWriteStreamRequest writeStreamRequest = GetWriteStreamRequest.newBuilder().setName(streamName).setView(WriteStreamView.FULL).build();
            WriteStream writeStream = this.client.getWriteStream(writeStreamRequest);
            TableSchema tableSchema = writeStream
                    .getTableSchema()
                    .toBuilder()
                    .addFields(
                            TableFieldSchema.newBuilder()
                                    .setName(CHANGE_TYPE_PSEUDO_COLUMN)
                                    .setType(TableFieldSchema.Type.STRING)
                                    .setMode(TableFieldSchema.Mode.NULLABLE)
                                    .build())
                    .build();

            bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.html
            return JsonStreamWriter.newBuilder(tableName, tableSchema, client)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigquerystorage Issues related to the googleapis/java-bigquerystorage API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

6 participants