-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28120][SS] Rocksdb state storage implementation #24922
Conversation
<!-- RocksDB dependency for Structured Streaming State Store --> | ||
<dependency> | ||
<groupId>org.rocksdb</groupId> | ||
<artifactId>rocksdbjni</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This dependency has all the files packed for all major OSs. Flink uses a custom build . Digging into this a bit more I see some additions modifications as described here. I understand this is flink specific but how about the TTL thing mentioned there, https://issues.apache.org/jira/browse/FLINK-10471 looks interesting. Structured Streaming fetches all state here (memory) and filters out the timed out ones, is RockDB performing well there? Shouldnt we have the same mechanism or a similar one so we dont fectch everything and delegate this to state backend (which could run in the background btw)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will take a look at the Flink build and see if I can pick only relevant packages in rocksdb dependency.
IMO, abstracting out how state backend should filter out timed out states can be treated as a separate problem so that we don't end up increasing the scope of this PR. Once the abstraction is added, we can file a separate jira to implement it for rocksdb state backend.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RocksDB might not be the best backend. Instead of adding the extra dependency, I think we should just do it as a separate third-party package. The community can always build their own backend based on their needs. Doing it is simple.
Can you submit it to https://spark-packages.org/?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile - what are the alternatives if rocksdb is not the best backend. Other streaming technologies such as flink and kstreams are using rocksdb as primary storage engine.
With integration in spark codebase, we can probably change the code in any way later, but if we take the separate jar route, the kind of extensions you can make are limited by the current contract. For example @skonto mentioned one of way where we can abstract state storage implementation to get the best out of rocksdb. How can we support such improvement of we take spark package route?
Current implementation based on in memory hashmap is not scalable beyond a point. How shall we go about solving it?
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDbStateStoreProvider.scala
Show resolved
Hide resolved
ping @arunmahadevan |
ok to test |
Thank you for your first contribution, @itsvikramagr ! |
Test build #106795 has finished for PR 24922 at commit
|
@itsvikramagr thanks for the hard work! The presentation rocks and helps to understand the design but it will take some time to analyze things properly. I think it's a good feature so will review it through.
|
Thanks @gaborgsomogyi
|
@itsvikramagr could you fix the scala style error?
|
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/WALUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/WALUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/WALUtils.scala
Outdated
Show resolved
Hide resolved
...rc/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDbStateStoreProvider.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/WALUtils.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/WALUtils.scala
Show resolved
Hide resolved
Test build #106864 has finished for PR 24922 at commit
|
Test build #106865 has finished for PR 24922 at commit
|
Thanks for the hard work, @itsvikramagr ! I agree keeping state in memory is not scalable, and the result looks promising. It might be better to have another kind of benchmark here, like stress test, to see the performance on stateful operations and let end users guide whether they're mostly encouraged to use this implementation, or use this selectively. What I did for my patch was following: Btw, it would take some time to review your patch as the diff is 2000+ lines, as well as I also have some works on my plate. You might want to spend time to get familiar with style guide if you haven't - there're some rules which are not checked via scala style check but reviewers will point out. |
And please take a deep look at build result if it fails, and try to fix if build failure is related to your patch. In some cases, build output log has a guide message to fix the issue, like this case.
|
Thanks, @HeartSaVioR. I understand it a very big change. As suggested let me create a stress test suite and paste some benchmark numbers. |
FYI: Just think out loud since I'm being cc-ed first, I'm just a one of contributors, not committers or PMC members of Apache Spark. In case of you might get confused due to "MEMBER" badge - the badge just means I'm one of committers in "any" of ASF projects. |
Test build #106869 has finished for PR 24922 at commit
|
Looking at the unit test failures. It's related to the rocksDbPath folder name. Will make it configurable and update the PR. |
Test build #109878 has finished for PR 24922 at commit
|
Test build #109897 has finished for PR 24922 at commit
|
Test build #109904 has finished for PR 24922 at commit
|
ping @gaborgsomogyi @dongjoon-hyun @HeartSaVioR I have a comment from @gaborgsomogyi to resolve. Is there anything else I should be doing to get this patch into 3.0 release |
@itsvikramagr are you planning to resolve the remaining comments or waiting on second opinion? I think the config is not yet resolved. |
Can one of the admins verify this patch? |
I was waiting for more comments. I think I have given enough time for more comments. Will fix the config changes and any other pending changes over the weekend. |
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDbInstance.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDbStateStoreConf.scala
Outdated
Show resolved
Hide resolved
I had repackaging this and plug it into a spark 2.4.3 system. (https://github.com/PingHao/spark-statestore-rocksdb). here is my observation
2019-10-07 15:19:57.968Z ERROR [Executor task launch worker for task 65] org.apache.spark.util.Utils - Aborting task I fixed this by change RocksDbStateStoreProvider.scala
on host C try to download on host A where snapshot created. [geo@plno-cto-fx2s1a l]$ grep 52786 executor.log on spark state directory[geo@plno-cto-fx2s1a 149]$ tar tvf 257.snapshot --- update on No.3 |
Will update the PR with the fix
For Isolation and Data consistency, we checkpoint the rocksdb state to local disk. As you have suggested a good file system and SSD based instance storage should be used to get the best performance.
Great catch. Let me look at it and make appropriate changes. |
Thanks. another issue found during overnight running is sporadic core dump, then spark would detect then start new executor. happen on all 3 spark worker nodes, several hours interval. back trace show it's cause by destroyDB jni call. detail as following [Thread debugging using libthread_db enabled] Core was generated by `/usr/local/openjdk-11/bin/java -cp /spark/conf/:/spark/jars/:/hadoop/etc/hadoo'. |
@PingHao - Both the issue- stuck executor, core dump - might be due to the same reasons. Will debug and fix it. Would you have some code snippets that can help me to reproduce the problem? |
I can produce core dump on my developer machine, but not sure if logic is legit anyway. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left my comment #24922 (comment)
First of all, I think this is great. Thanks for working on it! I tend to agree with @gatorsmile that we should consider making this feature an external package. The argument here is not that RocksDB is a bad choice. For many workloads it is likely a great option. However, for some workloads people might want to use cassandra, or leveldb or something else. Should we also allow new features to add dependencies on these? As the Spark project continues to grow, I think it is important that we guard against the core becoming a swiss army knife, with too many different configurations for the community to maintain in the long run. In this case we are not only adding a new dependency, but we are also committing the Spark to supporting the specifics of how you are packaging and uploading the RocksDB files forever. The whole reason we added an API for state stores to plug in was to enable this kind of innovation outside of the core of spark. If this package becomes super popular, I would reconsider this position, similar to how avro and csv were eventually inlined into core spark.
If the abstraction boundaries are wrong here, we should improve the APIs, not punch through them. I don't think this is a good argument for putting this into Spark. |
@marmbrus / @gatorsmile - Got your point on making it an external package. I will close the PR and corresponding JIRA. Will update this thread once I have submitted it to https://spark-packages.org/? Thanks, @gaborgsomogyi @HeartSaVioR @dongjoon-hyun @PingHao for looking into the PR and helping me to make significant improvements in it. |
@PingHao - I will fix the issue and post a message in your repo (https://github.com/PingHao/spark-statestore-rocksdb). |
@marmbrus @itsvikramagr Flink has supported this at the right level for them (https://github.com/apache/flink/tree/master/flink-state-backends). These backends dont change much and the benefits of using RockDB with checkpointing for large states are important, at least for Flink which emphasizes on stateful apps a lot. There is always a trade-off anyway. Having apis is the way to go for many parts but also some default, reference implementation is helpful especially if it makes a difference. Packages outside of the core project could become outdated fast I am afraid. |
I have released the code here (https://github.com/qubole/spark-state-store). Mvn artifact can be found here - https://mvnrepository.com/artifact/com.qubole.spark/spark-rocksdb-state-store. |
Thank you for the update, @itsvikramagr . |
https://docs.databricks.com/spark/latest/structured-streaming/production.html "As the Spark project continues to grow, I think it is important that we guard against the core becoming a swiss army knife, with too many different configurations for the community to maintain in the long run. In this case we are not only adding a new dependency, but we are also committing the Spark to supporting the specifics of how you are packaging and uploading the RocksDB files forever." RocksDB state store has "found its way" into a commercial offering, so it should "find its way" into mainline spark. It is obviously being packaged as a feature by databricks (thus proving its value) What will happen if spark tries to "tiptoe the open source the fence"? Folks will notice and they will simply switch to kafka streams, which is hungry for user base and offers rocksdb state OUT OF THE BOX. This is actually a conversation I had today, someone saying "spark cant do it lets use kafka streams or flink" |
What changes were proposed in this pull request?
SPARK-13809 introduced a framework for state management for computing Streaming Aggregates. The default implementation was in-memory hashmap which was backed up in HDFS complaint file system at the end of every micro-batch.
Current implementation suffers from Performance and Latency Issues. It uses Executor JVM memory to store the states. State store size is limited by the size of the executor memory. Also
Executor JVM memory is shared by state storage and other tasks operations. State storage size will impact the performance of task execution
Moreover, GC pauses, executor failures, OOM issues are common when the size of state storage increases which increases overall latency of a micro-batch
RocksDB is a storage engine with key/value interface based on levelDB. New writes are inserted into the memtable; when memtable fills up, it flushes the data on local storage. It supports both point lookups and range scans, and provides different types of ACID guarantees and is optimized for flash storage.
In this PR I have implemented Rocksdb based state storage for Structured streaming which will provide major performance improvements for stateful stream processing.
Implementation details
Creation of new State (For batch x and partition Pi)
Otherwise create new rocksDB store using checkpointed data (snapshot + delta)
During Batch Execution
Snapshot creation (Maintenance task)
A detailed presentation on the implementation can be found here
How was this patch tested?
Added new unit tests which extend existing abstract class StateStoreSuiteBase. (There is one UT failure which is related to metrics reporting in UT framework. Looking at it)