-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP: 128: produce using NATIVE_AVRO in SQL sink when choosing avro format to allow Pulsar consumer consume #217
base: develop
Are you sure you want to change the base?
Conversation
…rigger failure if not all tasks are started
Some InternalPriorityQueue implementations need a correct key/group set before performing poll() or remove(). In particular, ChangelogKeyGroupedPriorityQueue logs key group so that state changes can be re-distributed or shuffled. This change re-orders queue.poll and keyContext.setCurrentKey.
Implements the SEARCH operator in the codegen and removes the scalar implementation of IN and NOT_IN. Now every scalar IN/NOT_IN using a constant set is implemented through SEARCH (following Calcite's development on the topic CALCITE-4173) and plans will only have SEARCH. This closes apache#19001.
… in BoundedSourceITCase
Previously, the tmpWorkingDirectory was created in the current working directory, and as a result there were directories created in the root directories of the modules, i.e. `flink-table/flink-table-planner` which were not cleaned up with `mvn clean`.
The user explicitly marked the cleanup retry logic to terminate after a certain amount of attempts. This should be considered as desired behavior and shouldn't make the cluster fail fatally.
We want to try infinitely if nothing is specified.
…ponential-delay.attempts
…veral places This closes apache#19089.
…ined sink operators Since the topology has changes between Flink 1.14 and 1.15 it might happen that stateful upgrades are not possible if no pior operator uids were set. With this commit, users can set operator uid hashes for the respective operators.
…attern Since there is no dedicated committer operator in Flink 1.14 it is safe to use the uid pattern of 1.13 to ease upgrades from Flink 1.13 to 1.15.
… jar files on Windows
Deprecate `nullCheck` and `decimalContext`.
…nly the path of Path instance The issue before the fix was, that using getPath would strip off the scheme information which causes problems in situations where the FileSystem is not the default FileSystem
…clean up redundant content of bounded and unbounded data. - mvn dependency - using namespace in schema for reflect records [FLINK-26604][doc] bug fix (cherry picked from commit 579e554). This closes apache#19134
… if the JobGraph is not put into the JobGraphStore, yet This can happen if cleanup is triggered after a failover of a dirty JobResultStore entry (i.e. of a globally-terminated job). In that case, no recovery of the JobGraph happens and, therefore, no JobGraph is added to the internal addedJobGraphs collection. This required KubernetesStateHandleStore.releaseAndTryRemove to work for non-existing state as well. The ZooKeeperStateHandleStore implementation is already idempotent in this matter. ZooKeeperStateHandleStore.releaseAndTryRemove already works like that.
…ith pipeline jars This closes apache#19133.
…sar source connector.
…ting performance for PulsarSink.
…ue length option.
…ce. Avoid hanging on small message income rates.
drop extra empty line Apply suggestions from code review Co-authored-by: Huanli Meng <48120384+Huanli-Meng@users.noreply.github.com>
@@ -62,6 +68,7 @@ public PulsarTableSerializationSchema( | |||
this.valueSerialization = checkNotNull(valueSerialization); | |||
this.valueFieldGetters = checkNotNull(valueFieldGetters); | |||
this.writableMetadata = checkNotNull(writableMetadata); | |||
// this.pulsarSchema = getPulsarSchemaFromSerialization(valueSerialization); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remember to delete this line
@@ -89,7 +96,8 @@ public PulsarMessage<?> serialize(RowData consumedRow, PulsarSinkContext sinkCon | |||
} | |||
|
|||
byte[] serializedData = valueSerialization.serialize(valueRow); | |||
messageBuilder.value(Schema.BYTES, serializedData); | |||
// messageBuilder.value(Schema.BYTES, serializedData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remember to remove comments
private static Schema getPulsarSchemaFromSerialization(SerializationSchema<RowData> serializationSchema) { | ||
if (serializationSchema instanceof AvroRowDataSerializationSchema) { | ||
SerializationSchema<GenericRecord> nestedSchema = ((AvroRowDataSerializationSchema) serializationSchema).getNestedSchema(); | ||
org.apache.avro.Schema avroSchema = ((AvroSerializationSchema) nestedSchema).getSchema(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would harm the performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we initialize the schema in open() method and make it transient ?
dd5537e
to
d4559e0
Compare
Current fix is not ok, because we changed other components as well. We need to figure out a way not modifying the AvroRowDataSerializationSchema |
closes #126
closes #128
TODO: add more descriptions