-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45430] Fix for FramelessOffsetWindowFunction when IGNORE NULLS and offset > rowCount #43236
Conversation
… and offset > rowCount
} | ||
inputIndex = offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The root cause is findNextRowWithNonNullInput
requires assert(offset <= input.length)
, I think we can simplify the logic here!
if (offset > rows.length) {
fillDefaultValue(EmptyRow)
} else {
resetStates(rows)
if (ignoreNulls) {
findNextRowWithNonNullInput()
} else {
// drain the first few rows if offset is larger than zero
while (inputIndex < offset) {
if (inputIterator.hasNext) inputIterator.next()
inputIndex += 1
}
inputIndex = offset
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, I considered this but unfortunately it does not work. The actual projection (or default value) is created in doWrite
so prepare
needs to initialize variables such as input
, inputIndex
etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested with the code I mentioned, It works good!
override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = {
resetStates(rows)
if (offset > rows.length) {
fillDefaultValue(EmptyRow)
} else {
resetStates(rows)
if (ignoreNulls) {
findNextRowWithNonNullInput()
} else {
// drain the first few rows if offset is larger than zero
while (inputIndex < offset) {
if (inputIterator.hasNext) inputIterator.next()
inputIndex += 1
}
inputIndex = offset
}
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it works when ignoreNulls == true
but breaks when ignoreNulls == false
In particular it will trigger last else
branch in doWrite
:
else {
(current: InternalRow) =>
if (inputIndex >= 0 && inputIndex < input.length) {
val r = WindowFunctionFrame.getNextOrNull(inputIterator)
projection(r)
} else {
// Use default values since the offset row does not exist.
fillDefaultValue(current)
}
inputIndex += 1
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified PR to localize change for ignoreNulls
case only, please take a look
@beliefer please take a look. |
@cloud-fan please take a look |
while (inputIndex < offset) { | ||
if (inputIterator.hasNext) inputIterator.next() | ||
inputIndex += 1 | ||
if (ignoreNulls) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we can just change this to if (ignoreNulls && Math.abs(offset) > rows.length)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when offset > rows.length
it is safe to just set inputIndex
to its value. I can add conditional inside if (ignoreNulls)
if you feel strongly about it :)
if (ignoreNulls) { | ||
findNextRowWithNonNullInput() | ||
if (Math.abs(offset) > rows.length) { | ||
inputIndex = offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does this do? skip everything and return default value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it did indirectly. To avoid confusion I modified change to limit the scope to ignoreNulls
case only. Please take a look
@@ -201,7 +201,11 @@ class FrameLessOffsetWindowFunctionFrame( | |||
override def prepare(rows: ExternalAppendOnlyUnsafeRowArray): Unit = { | |||
resetStates(rows) | |||
if (ignoreNulls) { | |||
findNextRowWithNonNullInput() | |||
if (Math.abs(offset) > rows.length) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We no need Math.abs(offset)
here.
findNextRowWithNonNullInput
skips all the rows if offset < 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the current code is clear. It's not obvious that findNextRowWithNonNullInput
works for negative offsets, and I don't think perf is a problem here.
I think the pyspark test failure is unrelated and we can merge. @vitaliili-db how far shall we backport? is it a day-1 bug? |
yes, it is :(. Should go as far as 11.3 |
@cloud-fan can you merge this please? |
… and offset > rowCount ### What changes were proposed in this pull request? This is a fix for the failure when function that utilized `FramelessOffsetWindowFunctionFrame` is used with `ignoreNulls = true` and `offset > rowCount`. e.g. ``` select x, lead(x, 5) IGNORE NULLS over (order by x) from (select explode(sequence(1, 3)) x) ``` ### Why are the changes needed? Fix existing bug ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modify existing unit test to cover this case ### Was this patch authored or co-authored using generative AI tooling? No Closes #43236 from vitaliili-db/SPARK-45430. Authored-by: Vitalii Li <vitalii.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 32e1e58) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… and offset > rowCount ### What changes were proposed in this pull request? This is a fix for the failure when function that utilized `FramelessOffsetWindowFunctionFrame` is used with `ignoreNulls = true` and `offset > rowCount`. e.g. ``` select x, lead(x, 5) IGNORE NULLS over (order by x) from (select explode(sequence(1, 3)) x) ``` ### Why are the changes needed? Fix existing bug ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modify existing unit test to cover this case ### Was this patch authored or co-authored using generative AI tooling? No Closes #43236 from vitaliili-db/SPARK-45430. Authored-by: Vitalii Li <vitalii.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 32e1e58) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… and offset > rowCount ### What changes were proposed in this pull request? This is a fix for the failure when function that utilized `FramelessOffsetWindowFunctionFrame` is used with `ignoreNulls = true` and `offset > rowCount`. e.g. ``` select x, lead(x, 5) IGNORE NULLS over (order by x) from (select explode(sequence(1, 3)) x) ``` ### Why are the changes needed? Fix existing bug ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modify existing unit test to cover this case ### Was this patch authored or co-authored using generative AI tooling? No Closes #43236 from vitaliili-db/SPARK-45430. Authored-by: Vitalii Li <vitalii.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 32e1e58) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
thanks, merging to master/3.5/3.4/3/3! |
… and offset > rowCount ### What changes were proposed in this pull request? This is a fix for the failure when function that utilized `FramelessOffsetWindowFunctionFrame` is used with `ignoreNulls = true` and `offset > rowCount`. e.g. ``` select x, lead(x, 5) IGNORE NULLS over (order by x) from (select explode(sequence(1, 3)) x) ``` ### Why are the changes needed? Fix existing bug ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modify existing unit test to cover this case ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43236 from vitaliili-db/SPARK-45430. Authored-by: Vitalii Li <vitalii.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 32e1e58) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… and offset > rowCount ### What changes were proposed in this pull request? This is a fix for the failure when function that utilized `FramelessOffsetWindowFunctionFrame` is used with `ignoreNulls = true` and `offset > rowCount`. e.g. ``` select x, lead(x, 5) IGNORE NULLS over (order by x) from (select explode(sequence(1, 3)) x) ``` ### Why are the changes needed? Fix existing bug ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modify existing unit test to cover this case ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43236 from vitaliili-db/SPARK-45430. Authored-by: Vitalii Li <vitalii.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 32e1e58) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
… and offset > rowCount ### What changes were proposed in this pull request? This is a fix for the failure when function that utilized `FramelessOffsetWindowFunctionFrame` is used with `ignoreNulls = true` and `offset > rowCount`. e.g. ``` select x, lead(x, 5) IGNORE NULLS over (order by x) from (select explode(sequence(1, 3)) x) ``` ### Why are the changes needed? Fix existing bug ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Modify existing unit test to cover this case ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43236 from vitaliili-db/SPARK-45430. Authored-by: Vitalii Li <vitalii.li@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 32e1e58) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This is a fix for the failure when function that utilized
FramelessOffsetWindowFunctionFrame
is used withignoreNulls = true
andoffset > rowCount
.e.g.
Why are the changes needed?
Fix existing bug
Does this PR introduce any user-facing change?
No
How was this patch tested?
Modify existing unit test to cover this case
Was this patch authored or co-authored using generative AI tooling?
No