Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: 128: produce using NATIVE_AVRO in SQL sink when choosing avro format to allow Pulsar consumer consume #217

Open
wants to merge 258 commits into
base: develop
Choose a base branch
from

Conversation

imaffe
Copy link

@imaffe imaffe commented Sep 7, 2022

closes #126
closes #128

TODO: add more descriptions

Myasuka and others added 30 commits March 16, 2022 00:21
Some InternalPriorityQueue implementations need a correct key/group
set before performing poll() or remove().

In particular, ChangelogKeyGroupedPriorityQueue logs key group so that
state changes can be re-distributed or shuffled.

This change re-orders queue.poll and keyContext.setCurrentKey.
Implements the SEARCH operator in the codegen and removes
the scalar implementation of IN and NOT_IN. Now every scalar
IN/NOT_IN using a constant set is implemented through SEARCH
(following Calcite's development on the topic CALCITE-4173)
and plans will only have SEARCH.

This closes apache#19001.
Previously, the tmpWorkingDirectory was created in the current working
directory, and as a result there were directories created in the root
directories of the modules, i.e. `flink-table/flink-table-planner` which
were not cleaned up with `mvn clean`.
The user explicitly marked the cleanup retry logic to
terminate after a certain amount of attempts. This should be
considered as desired behavior and shouldn't make the cluster
fail fatally.
We want to try infinitely if nothing is specified.
…ined sink operators

Since the topology has changes between Flink 1.14 and 1.15 it might
happen that stateful upgrades are not possible if no pior operator uids
were set. With this commit, users can set operator uid hashes for the
respective operators.
…attern

Since there is no dedicated committer operator in Flink 1.14 it is safe
to use the uid pattern of 1.13 to ease upgrades from Flink 1.13 to 1.15.
…nly the path of Path instance

The issue before the fix was, that using getPath would strip
off the scheme information which causes problems in situations
where the FileSystem is not the default FileSystem
…clean up redundant content of bounded and unbounded data.

- mvn dependency
- using namespace in schema for reflect records

[FLINK-26604][doc] bug fix

(cherry picked from commit 579e554). This closes apache#19134
… if the JobGraph is not put into the JobGraphStore, yet

This can happen if cleanup is triggered after a
failover of a dirty JobResultStore entry (i.e. of
a globally-terminated job). In that case, no
recovery of the JobGraph happens and, therefore, no
JobGraph is added to the internal addedJobGraphs
collection.

This required KubernetesStateHandleStore.releaseAndTryRemove
to work for non-existing state as well. The ZooKeeperStateHandleStore
implementation is already idempotent in this matter.

ZooKeeperStateHandleStore.releaseAndTryRemove already works like that.
syhily and others added 21 commits July 22, 2022 03:37
…ce. Avoid hanging on small message income rates.
drop extra empty line

Apply suggestions from code review

Co-authored-by: Huanli Meng <48120384+Huanli-Meng@users.noreply.github.com>
@imaffe imaffe requested a review from a team as a code owner September 7, 2022 04:57
@@ -62,6 +68,7 @@ public PulsarTableSerializationSchema(
this.valueSerialization = checkNotNull(valueSerialization);
this.valueFieldGetters = checkNotNull(valueFieldGetters);
this.writableMetadata = checkNotNull(writableMetadata);
// this.pulsarSchema = getPulsarSchemaFromSerialization(valueSerialization);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember to delete this line

@@ -89,7 +96,8 @@ public PulsarMessage<?> serialize(RowData consumedRow, PulsarSinkContext sinkCon
}

byte[] serializedData = valueSerialization.serialize(valueRow);
messageBuilder.value(Schema.BYTES, serializedData);
// messageBuilder.value(Schema.BYTES, serializedData);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remember to remove comments

private static Schema getPulsarSchemaFromSerialization(SerializationSchema<RowData> serializationSchema) {
if (serializationSchema instanceof AvroRowDataSerializationSchema) {
SerializationSchema<GenericRecord> nestedSchema = ((AvroRowDataSerializationSchema) serializationSchema).getNestedSchema();
org.apache.avro.Schema avroSchema = ((AvroSerializationSchema) nestedSchema).getSchema();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would harm the performance.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we initialize the schema in open() method and make it transient ?

@imaffe imaffe force-pushed the affe/128-fetch-EMPTY branch from dd5537e to d4559e0 Compare September 7, 2022 09:50
@imaffe
Copy link
Author

imaffe commented Sep 7, 2022

Current fix is not ok, because we changed other components as well. We need to figure out a way not modifying the AvroRowDataSerializationSchema

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment