Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Snowpipe Streaming] Fix IndexOutOfBoundException thrown when offsets are not continous during schema-evolution #1037

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

sudeshwasnik
Copy link
Contributor

@sudeshwasnik sudeshwasnik commented Dec 21, 2024

Connector Exception stacktrace :

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:657)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:348)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:247)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:302)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.IndexOutOfBoundsException: Index 4293 out of bounds for length 3159
at java.base/jdk.internal.util.Preconditions.outOfBounds(Preconditions.java:100)
at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Preconditions.java:106)
at java.base/jdk.internal.util.Preconditions.checkIndex(Preconditions.java:302)
at java.base/java.util.Objects.checkIndex(Objects.java:385)
at java.base/java.util.ArrayList.get(ArrayList.java:427)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$StreamingBuffer.getSinkRecord(TopicPartitionChannel.java:1341)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:697)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel$InsertRowsApiResponseSupplier.get(TopicPartitionChannel.java:625)
at dev.failsafe.Functions.lambda$toCtxSupplier$11(Functions.java:243)
at dev.failsafe.Functions.lambda$get$0(Functions.java:46)
at dev.failsafe.internal.FallbackExecutor.lambda$apply$0(FallbackExecutor.java:51)
at dev.failsafe.SyncExecutionImpl.executeSync(SyncExecutionImpl.java:182)
at dev.failsafe.FailsafeExecutor.call(FailsafeExecutor.java:438)
at dev.failsafe.FailsafeExecutor.get(FailsafeExecutor.java:115)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertRowsWithFallback(TopicPartitionChannel.java:619)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertBufferedRecords(TopicPartitionChannel.java:545)
at com.snowflake.kafka.connector.internal.streaming.TopicPartitionChannel.insertRecordToBuffer(TopicPartitionChannel.java:423)
at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:325)
at com.snowflake.kafka.connector.internal.streaming.SnowflakeSinkServiceV2.insert(SnowflakeSinkServiceV2.java:292)
at com.snowflake.kafka.connector.SnowflakeSinkTask.put(SnowflakeSinkTask.java:313)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:623)
... 11 more

Due to this connector could not evolve-schema (even if snowflake.role.name had correct permissions)

Reproduction on ITs : #1036


What was the bug ?

This computation is not right.
When Offsets are continous, lets checkout records, offsets and sinkRecords objects
sinkRecords : [SinkRecord<2000>, SinkRecord<2001> .... SinkRecord<2100>] (101 records)
records : [Map<2000>, .... Map<2100>]
offsets : [2000, ... 2100].

For idx = 10

long originalSinkRecordIdx = offsets.get(idx) - this.insertRowsStreamingBuffer.getFirstOffset();

=> long originalSinkRecordIdx = 2010 - 2000 = 10\


If offsets are not continous (with gaps, like when using FilterSMT) ->
sinkRecords : [SinkRecord<2000>, SinkRecord<2002> .... SinkRecord<2100>] (51 records)
records : [Map<2000>, Map<2002>, .... Map<2100>]
offsets : [2000, 2002,... 2100].

For idx = 10

long originalSinkRecordIdx = offsets.get(idx) - this.insertRowsStreamingBuffer.getFirstOffset();

=> long originalSinkRecordIdx = 2020 - 2000 = 20 (incorrect).
[we want SinkRecord at 10th index].


Pre-review checklist

  • This change should be part of a Behavior Change Release. See go/behavior-change.
  • This change has passed Merge gate tests
  • Snowpipe Changes
  • Snowpipe Streaming Changes
  • This change is TEST-ONLY
  • This change is README/Javadocs only
  • This change is protected by a config parameter <PARAMETER_NAME> eg snowflake.ingestion.method.
    • Yes - Added end to end and Unit Tests.
    • No - Suggest why it is not param protected (This is a bug-fix)
  • Is his change protected by parameter <PARAMETER_NAME> on the server side?
    • The parameter/feature is not yet active in production (partial rollout or PrPr, see Changes for Unreleased Features and Fixes).
    • If there is an issue, it can be safely mitigated by turning the parameter off. This is also verified by a test (See go/ppp).

Urgency

This review is high priority

@sudeshwasnik sudeshwasnik requested a review from a team as a code owner December 21, 2024 10:54
@sudeshwasnik
Copy link
Contributor Author

sudeshwasnik commented Dec 21, 2024

hi @sfc-gh-mbobowski @sfc-gh-achyzy could you please review this quick small bug-fix ?
Thanks !

[Please guide me on how to sign CLA for this repository, here's my emailID : swasnik@confluent.io. Thanks !]

@sudeshwasnik sudeshwasnik changed the title Fix indexOutOfBoundException [Snowpipe Streaming] Fix IndexOutOfBoundException thrown when offsets are not continous during schema-evolution Dec 21, 2024
@sudeshwasnik
Copy link
Contributor Author

hi @sfc-gh-achyzy @sfc-gh-mbobowski @sfc-gh-gjachimko Requesting your reviews on this small bug-fix, thanks !

1 similar comment
@sudeshwasnik
Copy link
Contributor Author

hi @sfc-gh-achyzy @sfc-gh-mbobowski @sfc-gh-gjachimko Requesting your reviews on this small bug-fix, thanks !

Copy link
Contributor

@sfc-gh-dseweryn sfc-gh-dseweryn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the contribution! Good catch, just two minor things and we should be good to go

@sudeshwasnik
Copy link
Contributor Author

Thank you @sfc-gh-dseweryn and @sfc-gh-akowalczyk for review and approval !
I don't have enough permissions to merge, could you please merge it ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants