Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28120][SS] Rocksdb state storage implementation #24922

Closed
wants to merge 28 commits into from

Conversation

itsvikramagr
Copy link

What changes were proposed in this pull request?

SPARK-13809 introduced a framework for state management for computing Streaming Aggregates. The default implementation was in-memory hashmap which was backed up in HDFS complaint file system at the end of every micro-batch.

Current implementation suffers from Performance and Latency Issues. It uses Executor JVM memory to store the states. State store size is limited by the size of the executor memory. Also
Executor JVM memory is shared by state storage and other tasks operations. State storage size will impact the performance of task execution

Moreover, GC pauses, executor failures, OOM issues are common when the size of state storage increases which increases overall latency of a micro-batch

RocksDB is a storage engine with key/value interface based on levelDB. New writes are inserted into the memtable; when memtable fills up, it flushes the data on local storage. It supports both point lookups and range scans, and provides different types of ACID guarantees and is optimized for flash storage.

In this PR I have implemented Rocksdb based state storage for Structured streaming which will provide major performance improvements for stateful stream processing.

Implementation details

Creation of new State (For batch x and partition Pi)

  • if Node(Pi, x) = Node(Pi, x-1) : state is already loaded in rocksDb
  • Else if Node(Pi, x) = Node(Pi, x-2) : update rocksDb state using downloaded Delta(Pi, X-1)
    Otherwise create new rocksDB store using checkpointed data (snapshot + delta)

During Batch Execution

  • Open rocksdb in transactional mode
  • Commit the transaction, Upload delta file into checkpoint folder and Create a backup of current Db state in local storage on successful completion of the batch
  • abort the transaction on any error

Snapshot creation (Maintenance task)

  • Create a tarball of last backed up DB state and upload it to the checkpoint folder

A detailed presentation on the implementation can be found here

How was this patch tested?

Added new unit tests which extend existing abstract class StateStoreSuiteBase. (There is one UT failure which is related to metrics reporting in UT framework. Looking at it)

<!-- RocksDB dependency for Structured Streaming State Store -->
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
Copy link
Contributor

@skonto skonto Jun 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency has all the files packed for all major OSs. Flink uses a custom build . Digging into this a bit more I see some additions modifications as described here. I understand this is flink specific but how about the TTL thing mentioned there, https://issues.apache.org/jira/browse/FLINK-10471 looks interesting. Structured Streaming fetches all state here (memory) and filters out the timed out ones, is RockDB performing well there? Shouldnt we have the same mechanism or a similar one so we dont fectch everything and delegate this to state backend (which could run in the background btw)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will take a look at the Flink build and see if I can pick only relevant packages in rocksdb dependency.

IMO, abstracting out how state backend should filter out timed out states can be treated as a separate problem so that we don't end up increasing the scope of this PR. Once the abstraction is added, we can file a separate jira to implement it for rocksdb state backend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok makes sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RocksDB might not be the best backend. Instead of adding the extra dependency, I think we should just do it as a separate third-party package. The community can always build their own backend based on their needs. Doing it is simple.

Can you submit it to https://spark-packages.org/?

cc @marmbrus @tdas @zsxwing

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gatorsmile - what are the alternatives if rocksdb is not the best backend. Other streaming technologies such as flink and kstreams are using rocksdb as primary storage engine.

With integration in spark codebase, we can probably change the code in any way later, but if we take the separate jar route, the kind of extensions you can make are limited by the current contract. For example @skonto mentioned one of way where we can abstract state storage implementation to get the best out of rocksdb. How can we support such improvement of we take spark package route?

Current implementation based on in memory hashmap is not scalable beyond a point. How shall we go about solving it?

@itsvikramagr itsvikramagr changed the title [Spark 28120][SS] Rocksdb state storage implementation [SPARK-28120][SS] Rocksdb state storage implementation Jun 21, 2019
@itsvikramagr
Copy link
Author

ping @HeartSaVioR @tdas @jose-torres

@itsvikramagr
Copy link
Author

ping @arunmahadevan

@dongjoon-hyun
Copy link
Member

ok to test

@dongjoon-hyun
Copy link
Member

Thank you for your first contribution, @itsvikramagr !

@SparkQA
Copy link

SparkQA commented Jun 22, 2019

Test build #106795 has finished for PR 24922 at commit 292befe.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gaborgsomogyi
Copy link
Contributor

gaborgsomogyi commented Jun 24, 2019

@itsvikramagr thanks for the hard work! The presentation rocks and helps to understand the design but it will take some time to analyze things properly. I think it's a good feature so will review it through.
Initially I have these suggestions/questions:

  • Please resolve style problems (not just the ones which made the build fail, please see contribution guide for details. As an example number of tabs are incorrect consistently when parameters are broken into multiple lines)
  • Do we have some numbers about the performance impact?
  • Related testing part do I understand correctly you've not tested it on cluster with heavy load (and maybe with artificial exceptions)?

@itsvikramagr
Copy link
Author

Thanks @gaborgsomogyi

  • Will fix the style problem asap and update the PR
  • In my test setup, I was able to scale to more than 250 million keys using just 2 i3.xlarge executor nodes by running a group by aggregation query on campaign data source generated using rate source.
    I stopped my experiment after 5 hours. GC time was about 1.5% of the total task time (see attached). In the same setup, default implementation crashed after creating 35 million new state keys
  • I ran my experiments with varying load and under different stress condition. Please recommend more scenarios which you think I should be testing.

executor-memory-usage

state-store-rows

@felixcheung
Copy link
Member

felixcheung commented Jun 25, 2019

@itsvikramagr could you fix the scala style error?

/home/jenkins/workspace/SparkPullRequestBuilder/
sql/core/src/main/scala/org/apache/spark/sql/
execution/streaming/state/RocksDbInstance.scala:208:0: 
Whitespace at end of line

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106864 has finished for PR 24922 at commit 3f5f6b2.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106865 has finished for PR 24922 at commit f0f2f8d.

  • This patch fails build dependency tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HeartSaVioR
Copy link
Contributor

Thanks for the hard work, @itsvikramagr !

I agree keeping state in memory is not scalable, and the result looks promising. It might be better to have another kind of benchmark here, like stress test, to see the performance on stateful operations and let end users guide whether they're mostly encouraged to use this implementation, or use this selectively.

What I did for my patch was following:
https://issues.apache.org/jira/browse/SPARK-21271
#21733 (comment)

Btw, it would take some time to review your patch as the diff is 2000+ lines, as well as I also have some works on my plate. You might want to spend time to get familiar with style guide if you haven't - there're some rules which are not checked via scala style check but reviewers will point out.

@HeartSaVioR
Copy link
Contributor

And please take a deep look at build result if it fails, and try to fix if build failure is related to your patch. In some cases, build output log has a guide message to fix the issue, like this case.

Spark's published dependencies DO NOT MATCH the manifest file (dev/spark-deps).
To update the manifest file, run './dev/test-dependencies.sh --replace-manifest'.
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/pr-deps/spark-deps-hadoop-2.7
index 62b00f3..7e33e82 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/pr-deps/spark-deps-hadoop-2.7
@@ -171,6 +171,7 @@ parquet-jackson-1.10.1.jar
 protobuf-java-2.5.0.jar
 py4j-0.10.8.1.jar
 pyrolite-4.23.jar
+rocksdbjni-6.0.1.jar
 scala-compiler-2.12.8.jar
 scala-library-2.12.8.jar
 scala-parser-combinators_2.12-1.1.0.jar

@itsvikramagr
Copy link
Author

Thanks, @HeartSaVioR. I understand it a very big change. As suggested let me create a stress test suite and paste some benchmark numbers.

@HeartSaVioR
Copy link
Contributor

FYI: Just think out loud since I'm being cc-ed first, I'm just a one of contributors, not committers or PMC members of Apache Spark. In case of you might get confused due to "MEMBER" badge - the badge just means I'm one of committers in "any" of ASF projects.

@SparkQA
Copy link

SparkQA commented Jun 25, 2019

Test build #106869 has finished for PR 24922 at commit 827ace4.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@itsvikramagr
Copy link
Author

Looking at the unit test failures. It's related to the rocksDbPath folder name. Will make it configurable and update the PR.

@SparkQA
Copy link

SparkQA commented Aug 28, 2019

Test build #109878 has finished for PR 24922 at commit c38bd6c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 29, 2019

Test build #109897 has finished for PR 24922 at commit fb86f0d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 29, 2019

Test build #109904 has finished for PR 24922 at commit 4544abc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@itsvikramagr
Copy link
Author

ping @gaborgsomogyi @dongjoon-hyun @HeartSaVioR

I have a comment from @gaborgsomogyi to resolve. Is there anything else I should be doing to get this patch into 3.0 release

@gaborgsomogyi
Copy link
Contributor

@itsvikramagr are you planning to resolve the remaining comments or waiting on second opinion? I think the config is not yet resolved.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@itsvikramagr
Copy link
Author

itsvikramagr commented Sep 20, 2019

@itsvikramagr are you planning to resolve the remaining comments or waiting on second opinion? I think the config is not yet resolved.

I was waiting for more comments. I think I have given enough time for more comments. Will fix the config changes and any other pending changes over the weekend.

@PingHao
Copy link

PingHao commented Oct 8, 2019

I had repackaging this and plug it into a spark 2.4.3 system. (https://github.com/PingHao/spark-statestore-rocksdb). here is my observation

  1. we using flatMapGroupsWithState, it cause it fail at begining,

2019-10-07 15:19:57.968Z ERROR [Executor task launch worker for task 65] org.apache.spark.util.Utils - Aborting task
java.lang.IllegalArgumentException: requirement failed: Cannot getRange after already committed or aborted
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider$RocksDbStateStore.getRange(RocksDbStateStoreProvider.scala:149)
at org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper$StateManagerImplBase.getAllState(FlatMapGroupsWithStateExecHelper.scala:107)
at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$InputProcessor.processTimedOutState(FlatMapGroupsWithStateExec.scala:181)
at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$doExecute$3(FlatMapGroupsWithStateExec.scala:128)
at scala.collection.Iterator$ConcatIteratorCell.headIterator(Iterator.scala:248)
at scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:194)
at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:225)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:117)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

I fixed this by change RocksDbStateStoreProvider.scala
override def getRange(
start: Option[UnsafeRow],
end: Option[UnsafeRow]): Iterator[UnsafeRowPair] = {
require (state == UPDATING || state == LOADED,
s"Cannot getRange after already committed or aborted: state is $state")
iterator()
}
2. Rocksdb checkpoint creating had a quite high time cost, sometimes > 20 secs, it's become the most delay of many spark tasks. then I checked my three nodes spark worker, it turned out the data partition is one of on devicemapper, another two on xfs, then I changed all of them to a ext4 partition, the result is much better, it's now could be < 10ms for most case, but still sometimes could be > 100ms.

  1. All spark executors stucks when one of executor try to load snapshot file from spark checkpoint. Note that this is a 3 node system, 56 CPU cores, 167 partitions. and spark checkpoint reside on a shared NFS partition (i know it's better to be HDFS)
    my theroy is when the downloading executor try to download the snapshot, it happen almost same time as another executor try to write the snapshot in same time. so it get the incompleted file and throw out result. I'm thinking let's it retry when this happen, all make task fail and let spark schedule rerun the task somehow.

on host C try to download
2019-10-08 14:30:38.756Z INFO [Executor task launch worker for task 53012] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - Will download file:/checkpoint/state/0/149/257.snapshot at location /logs/tmp/state_96281597/0/149/257.tar
2019-10-08 14:30:39.375Z INFO [Executor task launch worker for task 53012] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - Loading state
for 257 and partition 149 took 626 ms.
2019-10-08 14:30:39.386Z ERROR [Executor task launch worker for task 53012] org.apache.spark.util.Utils - Aborting task
java.lang.IllegalStateException: Error while creating rocksDb instance While opening a file for sequentially reading: /logs/checkpoint/state_96281597/0/149/257/CURRENT: No
such file or directory
at org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbInstance.open(RocksDbInstance.scala:65)
at org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider$RocksDbStateStore.iterator(RocksDbStateStoreProvider.scala:230)
at org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider$RocksDbStateStore.getRange(RocksDbStateStoreProvider.scala:151)
at org.apache.spark.sql.execution.streaming.state.FlatMapGroupsWithStateExecHelper$StateManagerImplBase.getAllState(FlatMapGroupsWithStateExecHelper.scala:107)
at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec$InputProcessor.processTimedOutState(FlatMapGroupsWithStateExec.scala:181)
at org.apache.spark.sql.execution.streaming.FlatMapGroupsWithStateExec.$anonfun$doExecute$3(FlatMapGroupsWithStateExec.scala:128)
at scala.collection.Iterator$ConcatIteratorCell.headIterator(Iterator.scala:248)
at scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:194)
at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:225)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:117)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.rocksdb.RocksDBException: While opening a file for sequentially reading: /logs/checkpoint/state_96281597/0/149/257/CURRENT: No such file or directory
at org.rocksdb.RocksDB.openROnly(Native Method)
at org.rocksdb.RocksDB.openReadOnly(RocksDB.java:370)
at org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbInstance.open(RocksDbInstance.scala:55)
... 24 more

on host A where snapshot created.

[geo@plno-cto-fx2s1a l]$ grep 52786 executor.log
2019-10-08 14:30:09.992Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.StateStore - Retrieved reference to StateStoreCoordinator: org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef@6e8a89d9
2019-10-08 14:30:09.992Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.StateStore - Reported that the loaded instance StateStoreProviderId(StateStoreId(file:/checkpoint/state,0,149,default),987a396d-8b24-4426-94c7-d1c67d91496b) is active
2019-10-08 14:30:09.992Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - get Store for version 256
2019-10-08 14:30:09.993Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - Loading state into the db for 256 and partition 149
2019-10-08 14:30:09.993Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.rocksdb.RocksDbStateStoreProvider - Loading state for 256 and partition 149 took 0 ms.
2019-10-08 14:30:13.840Z INFO [Executor task launch worker for task 52786] org.apache.spark.sql.execution.streaming.state.rocksdb.OptimisticTransactionDbInstance - Creating Checkpoint at /logs/checkpoint/state_96281597/0/149/257 took 444 ms.

on spark state directory

[geo@plno-cto-fx2s1a 149]$ tar tvf 257.snapshot
-rw-r--r-- 0/0 144670 2019-10-08 09:30 001183.sst
-rw-r--r-- 0/0 5602748 2019-10-08 09:30 001192.sst
-rw-r--r-- 0/0 5223 2019-10-08 09:30 OPTIONS-001191
-rw-r--r-- 0/0 29864343 2019-10-08 09:30 001182.sst
-rw-r--r-- 0/0 366 2019-10-08 09:30 MANIFEST-001188
-rw-r--r-- 0/0 16 2019-10-08 09:30 CURRENT

--- update on No.3
No.3 fixed after change getRange to this
def getRange(
start: Option[UnsafeRow],
end: Option[UnsafeRow]): Iterator[UnsafeRowPair] = {
initTransaction() // <----- change here
require(state == UPDATING, s"Cannot getRange after already committed or aborted: state is $state")
iterator()
}

@itsvikramagr
Copy link
Author

itsvikramagr commented Oct 9, 2019

  1. we using flatMapGroupsWithState, it cause it fail at begining

Will update the PR with the fix

  1. Rocksdb checkpoint creating had a quite high time cost, sometimes > 20 secs, .. then I changed all of them to a ext4 partition, the result is much better, it's now could be < 10ms for most case, but still sometimes could be > 100ms.

For Isolation and Data consistency, we checkpoint the rocksdb state to local disk. As you have suggested a good file system and SSD based instance storage should be used to get the best performance.

  1. All spark executors stucks when one of executor try to load snapshot file from spark checkpoint.

Great catch. Let me look at it and make appropriate changes.

@PingHao
Copy link

PingHao commented Oct 9, 2019

Thanks. another issue found during overnight running is sporadic core dump, then spark would detect then start new executor. happen on all 3 spark worker nodes, several hours interval. back trace show it's cause by destroyDB jni call. detail as following

[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".

Core was generated by `/usr/local/openjdk-11/bin/java -cp /spark/conf/:/spark/jars/:/hadoop/etc/hadoo'.
Program terminated with signal SIGABRT, Aborted.
#0 __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
51 ../sysdeps/unix/sysv/linux/raise.c: No such file or directory.
[Current thread is 1 (Thread 0x7fe3be4f7700 (LWP 80064))]
(gdb)
(gdb)
(gdb) bt
#0 __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
#1 0x00007ff7c9d1b42a in __GI_abort () at abort.c:89
#2 0x00007ff7c9469b55 in os::abort(bool, void
, void const*) () from /usr/local/openjdk-11/lib/server/libjvm.so
#3 0x00007ff7c966b826 in VMError::report_and_die(int, char const*, char const*, __va_list_tag*, Thread*, unsigned char*, void*, void*, char const*, int, unsigned long) ()
from /usr/local/openjdk-11/lib/server/libjvm.so
#4 0x00007ff7c966c7b9 in VMError::report_and_die(Thread*, unsigned int, unsigned char*, void*, void*, char const*, ...) () from /usr/local/openjdk-11/lib/server/libjvm.so
#5 0x00007ff7c966c7f1 in VMError::report_and_die(Thread*, unsigned int, unsigned char*, void*, void*) () from /usr/local/openjdk-11/lib/server/libjvm.so
#6 0x00007ff7c946f5eb in JVM_handle_linux_signal () from /usr/local/openjdk-11/lib/server/libjvm.so
#7 0x00007ff7c9464db3 in signalHandler(int, siginfo*, void*) () from /usr/local/openjdk-11/lib/server/libjvm.so
#8
#9 0x00007fe3bdbec6f5 in rocksdb::ColumnFamilyOptions::ColumnFamilyOptions(rocksdb::Options const&) () from /data/tmp/librocksdbjni8927661261690225315.so
#10 0x00007fe3bdad4299 in rocksdb::SanitizeOptions(std::string const&, rocksdb::Options const&) () from /data/tmp/librocksdbjni8927661261690225315.so
#11 0x00007fe3bda9d3f2 in rocksdb::DestroyDB(std::string const&, rocksdb::Options const&, std::vector<rocksdb::ColumnFamilyDescriptor, std::allocator<rocksdb::ColumnFamilyD
escriptor> > const&) () from /data/tmp/librocksdbjni8927661261690225315.so
#12 0x00007fe3bd9fb547 in Java_org_rocksdb_RocksDB_destroyDB () from /data/tmp/librocksdbjni8927661261690225315.so
#13 0x00007ff7b45b5c8f in ?? ()
#14 0x00007fea789e1990 in ?? ()

@itsvikramagr
Copy link
Author

@PingHao - Both the issue- stuck executor, core dump - might be due to the same reasons. Will debug and fix it.

Would you have some code snippets that can help me to reproduce the problem?

@PingHao
Copy link

PingHao commented Oct 9, 2019

@PingHao - Both the issue- stuck executor, core dump - might be due to the same reasons. Will debug and fix it.

Would you have some code snippets that can help me to reproduce the problem?
My running code is difficult to isolation or share here, so here is a new test case (based on existing test case "maintenance" ) in your RocksDbStateStoreSuite.scala, to try simulator parallel spark tasks operation on each partition statestore and at same time have maintenance thread try destoryDB. see code here

https://gist.github.com/PingHao/c20846542adda742f27ff00459fafe29#file-rocksdbstatestoresuite-scala-L384

I can produce core dump on my developer machine, but not sure if logic is legit anyway.
you can change N - number of partitions, and LOOPS. recommend N = number of cpu cores.

Copy link
Member

@gatorsmile gatorsmile left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left my comment #24922 (comment)

@marmbrus
Copy link
Contributor

marmbrus commented Oct 11, 2019

First of all, I think this is great. Thanks for working on it!

I tend to agree with @gatorsmile that we should consider making this feature an external package.

The argument here is not that RocksDB is a bad choice. For many workloads it is likely a great option. However, for some workloads people might want to use cassandra, or leveldb or something else. Should we also allow new features to add dependencies on these?

As the Spark project continues to grow, I think it is important that we guard against the core becoming a swiss army knife, with too many different configurations for the community to maintain in the long run. In this case we are not only adding a new dependency, but we are also committing the Spark to supporting the specifics of how you are packaging and uploading the RocksDB files forever.

The whole reason we added an API for state stores to plug in was to enable this kind of innovation outside of the core of spark.

If this package becomes super popular, I would reconsider this position, similar to how avro and csv were eventually inlined into core spark.

With integration in spark codebase, we can probably change the code in any way later, but if we take the separate jar route, the kind of extensions you can make are limited by the current contract. For example @skonto mentioned one of way where we can abstract state storage implementation to get the best out of rocksdb. How can we support such improvement of we
take spark package route?

If the abstraction boundaries are wrong here, we should improve the APIs, not punch through them. I don't think this is a good argument for putting this into Spark.

@itsvikramagr
Copy link
Author

itsvikramagr commented Oct 18, 2019

@marmbrus / @gatorsmile - Got your point on making it an external package. I will close the PR and corresponding JIRA. Will update this thread once I have submitted it to https://spark-packages.org/?

Thanks, @gaborgsomogyi @HeartSaVioR @dongjoon-hyun @PingHao for looking into the PR and helping me to make significant improvements in it.

@itsvikramagr
Copy link
Author

@PingHao - Both the issue- stuck executor, core dump - might be due to the same reasons. Will debug and fix it.
Would you have some code snippets that can help me to reproduce the problem?
My running code is difficult to isolation or share here, so here is a new test case (based on existing test case "maintenance" ) in your RocksDbStateStoreSuite.scala, to try simulator parallel spark tasks operation on each partition statestore and at same time have maintenance thread try destoryDB. see code here

https://gist.github.com/PingHao/c20846542adda742f27ff00459fafe29#file-rocksdbstatestoresuite-scala-L384

I can produce core dump on my developer machine, but not sure if logic is legit anyway.
you can change N - number of partitions, and LOOPS. recommend N = number of cpu cores.

@PingHao - I will fix the issue and post a message in your repo (https://github.com/PingHao/spark-statestore-rocksdb).

@skonto
Copy link
Contributor

skonto commented Oct 18, 2019

@marmbrus @itsvikramagr Flink has supported this at the right level for them (https://github.com/apache/flink/tree/master/flink-state-backends). These backends dont change much and the benefits of using RockDB with checkpointing for large states are important, at least for Flink which emphasizes on stateful apps a lot. There is always a trade-off anyway. Having apis is the way to go for many parts but also some default, reference implementation is helpful especially if it makes a difference. Packages outside of the core project could become outdated fast I am afraid.

@itsvikramagr
Copy link
Author

I have released the code here (https://github.com/qubole/spark-state-store). Mvn artifact can be found here - https://mvnrepository.com/artifact/com.qubole.spark/spark-rocksdb-state-store.

@dongjoon-hyun
Copy link
Member

Thank you for the update, @itsvikramagr .

@edwardcapriolo
Copy link

edwardcapriolo commented May 27, 2021

https://docs.databricks.com/spark/latest/structured-streaming/production.html

https://docs.databricks.com/spark/latest/structured-streaming/production.html#configure-rocksdb-state-store

"As the Spark project continues to grow, I think it is important that we guard against the core becoming a swiss army knife, with too many different configurations for the community to maintain in the long run. In this case we are not only adding a new dependency, but we are also committing the Spark to supporting the specifics of how you are packaging and uploading the RocksDB files forever."

RocksDB state store has "found its way" into a commercial offering, so it should "find its way" into mainline spark. It is obviously being packaged as a feature by databricks (thus proving its value)

What will happen if spark tries to "tiptoe the open source the fence"? Folks will notice and they will simply switch to kafka streams, which is hungry for user base and offers rocksdb state OUT OF THE BOX. This is actually a conversation I had today, someone saying "spark cant do it lets use kafka streams or flink"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.