-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows #23634
Conversation
Code cleaning needs to be done. I would like to get it reviewed regardless of code cleaning since this looks like correctness issue, and I would like to know earlier that the approach I've taken is acceptable. |
Test build #101620 has finished for PR 23634 at commit
|
Test build #101618 has finished for PR 23634 at commit
|
I think the bug here isn't in the third step, but the second step:
This isn't valid even with a matched flag. If L1 isn't evicted, that means a new row L1' should still be able to match with R1, and therefore R1 can't be evicted either. The unit test seems to bear this out; the left side of the self-join is supposed to evict records 5 seconds behind the watermark, but it seems to be incorrectly waiting 10 second instead. |
Seems like I need to add the query along with edge-case. (Just updated the description.)
No, left side waited for 5 seconds behind the watermark, whereas right side didn't wait behind the watermark. The join condition is not Here's a physical plan from one of batch while running similar query of UT in spark-shell:
cropped join information which we only want:
event time watermark = '1548219494841' (2019/01/23 13:58:14.841 GMT+09:00)
I think state eviction behind the watermark is due to wait for new row, which other conditions (like previously joined rows) should not matter. So we may need to focus on watermark itself. Suppose Please let me know if I'm missing here. |
I think it's incorrect to evict R1 if we aren't also evicting all rows in L which R1 could have matched with. If it's just a matter of timestamps, is there some way to reproduce the issue without the self join so it's easier to analyze the test case? |
Ah, never mind on that last question. I can reproduce it by just splitting up the input stream, although for some reason only 2 and not 4 appears to exhibit the problem. |
My understanding for storing rows in state is for matching against new rows, not for concerning previous status. If we have to keep rows in right side due to joined left side rows are not evicted (may end up with also waiting for 5 seconds in above case), we end up unnecessary storing rows for right side (which cannot be matched against new rows in left side) which can be just replaced with matched flag. |
The thing is |
I agree with that assessment, but my understanding is that they're supposed to be the same. I'll try to help find a third opinion here. |
OK thanks again for taking your time to look into the details. I was about to bring another topic to discuss - backward compatibility with old state - but I'll postpone it (also code clean up) until we have reached same approach to solve. |
Shouldn't this be based on the output mode?. In update mode it may be ok to emit However in append mode I would assume it should only emit the result after the event time passes the watermark threshold so there should not be updates for the same row. is that the case? I think here the confusion is the join predicate is also used to determine the watermark whereas ideally it should be independent. Anyways I think the events on both side should be buffered until the event time passes the watermark threshold and otherwise it would not produce the right results. Immediately discarding one side events implies we tie the watermark to be the event time of that side without any delay so late events on that side of input are immediately discarded. However the input on the other side are buffered which is wrong. So it seems that there is two different watermarks here (one for each input) which seems wrong. Ideally the watermark should be tied to the operator (join) and not separate watermarks for each input so that the operator can compute the result based on its watermark. |
In this case it produces incorrect result, because matched row will be emitted first, and null-matched row will be emitted later, which may "overwrite" the result and treat the final result as null-matched. IMHO regardless of modes, the final result should be same per key, same result between batch and streaming. Suppose the query is running as a batch query, then null-matched row will never be produced. So I'm not sure this is related to output mode.
As I commented earlier to Jose, watermark for late tuple is same across operators. The difference is when to evict rows in state, which I guess it could be according to join condition. |
What I meant was in The rows in one of the inputs are immediately discarded while I think both inputs should be retained till the watermark of the operator advances so that the correct results can be produced. |
The rows were stored to states in both sides in batch 1: they just evicted from states at different batches (batch 3 for left side, batch 2 for right side), which global watermark was advanced in batch 2, so the right-side eviction from batch 2 was valid under the watermark condition. Left-side "late" eviction is just due to join condition. |
Thanks for the explanation, so it seems the right side rows are retained until the global watermark (min of left and right watermarks). But the left side rows are evicted later due to the join condition and then it joins with a 'null' in the right side since the right side got evicted before. Then what you are proposing (storing the matched state) makes sense as long as it handles the different join types and conditions. If it does not, may be we need to retain the right side rows till the left side rows are evicted. |
Exactly. Thanks for spending time to understand the details. |
Test build #101863 has finished for PR 23634 at commit
|
2d00a0d
to
32e48d9
Compare
Test build #101858 has finished for PR 23634 at commit
|
Test build #101865 has finished for PR 23634 at commit
|
I just realized current approach is not compatible with old state, and it is non-trivial to address the issue. I would end up with applying either one to support backward compatibility:
As we have been addressing state compatibility issue via 2), I would try to tackle 2) first. |
Just applied the approach 2 and removed The code may have some redundant parts: not only state format but also the logic have to be versioned so it's not trivial to abstract them all. I'm planning to reduce duplication, but I'd be really appreciated someone could help me to find out where and how to clean up the code, or even restructure packages/traits/classes. Thanks in advance for all reviewers! |
Test build #101984 has finished for PR 23634 at commit
|
Kindly reminder. While I agree we should take it serious when dealing with state (since supporting backward compatibility is not easy one for state), I think we should also put major and immediate effort to deal with correctness issue. |
@HeartSaVioR Hey, thank you for taking up the challenge of fixing this complex bug and my apologies for not being able to take a look at this earlier. I am trying to get up to speed to the existing conversation and look at the changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@HeartSaVioR This is good clean solution!!!! However, I think it can be simpler and more efficient. Unless, I am missing something, I dont think there is a need for a third state store just to keep the matched boolean value. Why not make the KeyWithIndexToValueStateStore also keep the boolean flag? That is, the store will have (key, index) -> (value, matched). Then we dont have to add another state store in the mix (each state store adds significant performance overhead of syncing to checkpoint location).
And with this simplification, I dont think the JoinStateManager and StateHandlers need to be refactored SO MUCH. Its pretty scary to review so much change. I strongly suggest that you keep the current SymmetricHashJoinStateManager file roughly in the same structured, and only do the following changes.
- Convert KeyWithIndexToValueStateStore to optionally store the matched flag (probably no need to rename it, minimizes the diff).
- Change SymmetricHashJoinStateManager to take a boolean option to use matched flag or not. Rest of the code set that flag into the store handlers only if the flag is set. Since the number of stores do not change, I feel that the code changes necessary to add support for matched SymmetricHashJoinStateManager should be much less than the current diff.
- Another, possibly future change that this minimal refactoring, would enable is that we dont have to enable the flag for both sides of the join. Only the left side in a left outer join needs the matched flag (and vice versa). And in fact, inner joins do not need that matched flag. So it actually makes sense to add the support for matched flag as a purely optional feature within the existing SymmetricHashJoinStateManager.
Overall, I strongly suggest to minimize the diff visible in the PR so that its easier to do a rigorous review. I havent done a detailed pass due to the size of this diff. Let me know if all of these make sense.
.withWatermark("fooTime", "5 seconds") | ||
.join( | ||
barStream.withWatermark("barTime", "5 seconds"), | ||
expr("barId = fooId AND barTime >= fooTime AND barTime <= fooTime + interval 5 seconds"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not stick to the naming convention in existing tests of using "leftX" and "rightX". Its much easier to read the code when it is so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally agreed. I just took the query which reporter of issue originally report as reproducer. I'll change it.
assertNumStateRows(7, 7), | ||
|
||
AddData(inputStream, (6, 6L), (7, 7L), (8, 8L), (9, 9L), (10, 10L)), | ||
// batch 2 - global watermark = 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should global watermark = 3 <= min(5, 3) as max timestamp on left = 10, on right = 8
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not missing here, max timestamp on both are 10. Left and right only differ when the last value ends with odd value. Please take a look at the result: (10, 10L, 10, 10L)
is one of result on matched.
@tdas Lesson learned from my previous work #21733 was reducing the size of diff on state per batch performs better (size and time) in spite of needs on additional projection. I considered both approaches: 1) add boolean flag to current index to row 2) add a new state store to only store boolean flag. If we compare both approach via state size, we can expect below:
Given that we store the row as it is for value part, most of the times Suppose we take approach 2), refactor of codebase is necessary to reduce huge code duplication: current implementation doesn't seem to have extensibility - and the change refactors the code to try best to reduce code duplication whereas same code can be used to two places. I would be happy to take approach 1) in this PR and experiment about approach 2) later if we doubt about its benefit. Please let me know which one we would prefer. Thanks in advance! |
@tdas Does it mean you're suggesting not to version state format for streaming join? Situation would be pretty different which we choose.
I'm not sure this is safe to guarantee accessibility of Please shed a light (sketched idea) on how to do it if you would want to guide me here.
I think refactoring is still necessary if we version state format (we could flatten some interface and implementations into one though), cause we will end up having two StreamingJoinStateManagers which most of things are duplicated but implementation of KeyWithIndexToValueStore will be a bit different. If we don't add interface but only add boolean flag to classify state formats we would end up having code regarding state format being mixed up with logic, complicated and hard to debug. In overall I totally understand the importance about reducing the code diff, but I'd hope we'd not too concerned about large code diff if we find its worth on introducing more codes. (I meant I'd hope the amount of code change would not be the first thing to consider.) To evaluate the worth we might be ideal to put aside of length of code diff when deciding general direction. |
I would still suggest to have matched flag at any chance: suppose we don't touch anything but change the query from inner join to left outer join - then theoretically the result could be incorrect because we don't record |
I've addressed changing approach from having 3rd state store to adding Please take a look at last commit I made to change the approach: 912a250 Though it didn't roll back refactoring I've done for state versioning, it would share a view to determine the difference between 3rd state store vs matched field in 2nd store. (Some projections were necessary to get original row from value side.) Honestly, this commit represents the rationalization why refactoring was necessary. Less than 200 lines (add + remove) needed for applying suggestion on new approach, while it keeps versioning for state formats. The change is done only in state handler side, and doesn't touch any part of join logic. |
Test build #102506 has finished for PR 23634 at commit
|
@HeartSaVioR I will try to take a look soon, as soon as I find the time to do a detailed. please bear with me :)
|
@tdas No problem! Happy to see we reached consensus on the general direction to fix issue (matched flag). Please ping me once you find the time and have time to provide feedback. I'll play with other issues as well as of now. |
@tdas |
Ping. |
Kindly reminder. |
Ping. |
Ping again, as Spark+AI Summit 2019 in SF is end. |
…for already matched rows * Remove unused method * Address backward compatibility with old state * Do some refactoring * address backward compatibility via introducing state format version * Introduce helper object to deduplicate long-code * Add 'matched' field in value type of state store instead of adding one more state store * Normalize names as left & right for join tests
Test build #105357 has finished for PR 23634 at commit
|
@tdas Kindly reminder, as this is correctness issue. cc to @zsxwing and @jose-torres @gaborgsomogyi as well. |
Ping. |
ok to test |
Test build #110692 has finished for PR 23634 at commit
|
Test build #110954 has finished for PR 23634 at commit
|
cc. @tdas @zsxwing @jose-torres @gaborgsomogyi I would be really appreciated if we could bring this forward before 3.0 tech preview. |
I just revisited this and reduced large amount of diff via reducing the scope of refactor. Revised patch is available - #26108. Closing this. |
What changes were proposed in this pull request?
This patch fixes the edge case of streaming left/right outer join described below:
Suppose query is provided as
select * from A join B on A.id = B.id AND (A.ts <= B.ts AND B.ts <= A.ts + interval 5 seconds)
and there're two rows for L1 (from A) and R1 (from B) which ensures L1.id = R1.id and L1.ts = R1.ts.
(we can simply imagine it from self-join)
Then Spark processes L1 and R1 as below:
When determining outer rows to match with null, Spark applies some assumption commented in codebase, as below:
But as explained the edge-case earlier, the assumption is not correct. As we don't have any good assumption to optimize which doesn't have edge-case, we have to track whether such row is matched with others before, and match with null row only when the row is not matched.
To track the matching of row, the patch adds a new state to streaming join state manager, and mark whether the row is matched to others or not. We leverage the information when dealing with eviction of rows which would be candidates to match with null rows.
This approach introduces new state format which is not compatible with old state format - queries with old state format will be still running but they will still have the issue and be required to discard checkpoint and rerun to take this patch in effect.
How was this patch tested?
Added UT which fails on current Spark and passes with this patch. Also passed existing streaming join UTs.