diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index 9fb823abaa3ab..547834c7f9e3a 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2390,6 +2390,11 @@ Here are the configs regarding to RocksDB instance of the state store provider:
Allow the rocksdb runtime to use fallocate to pre-allocate disk space for logs, etc... Disable for apps that have many smaller state stores to trade off disk space for write performance. |
true |
+
+ spark.sql.streaming.stateStore.rocksdb.compression |
+ Compression type used in RocksDB. The string is converted RocksDB compression type through RocksDB Java API getCompressionType(). |
+ lz4 |
+
##### RocksDB State Store Memory Management
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 1e3f3a67f16f6..40f63c86a6a4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.rocksdb.{RocksDB => NativeRocksDB, _}
+import org.rocksdb.CompressionType._
import org.rocksdb.TickerType._
import org.apache.spark.TaskContext
@@ -91,7 +92,7 @@ class RocksDB(
tableFormatConfig.setPinL0FilterAndIndexBlocksInCache(true)
}
- private val columnFamilyOptions = new ColumnFamilyOptions()
+ private[state] val columnFamilyOptions = new ColumnFamilyOptions()
// Set RocksDB options around MemTable memory usage. By default, we let RocksDB
// use its internal default values for these settings.
@@ -103,6 +104,8 @@ class RocksDB(
columnFamilyOptions.setMaxWriteBufferNumber(conf.maxWriteBufferNumber)
}
+ columnFamilyOptions.setCompressionType(getCompressionType(conf.compression))
+
private val dbOptions =
new Options(new DBOptions(), columnFamilyOptions) // options to open the RocksDB
@@ -676,7 +679,8 @@ case class RocksDBConf(
writeBufferCacheRatio: Double,
highPriorityPoolRatio: Double,
compressionCodec: String,
- allowFAllocate: Boolean)
+ allowFAllocate: Boolean,
+ compression: String)
object RocksDBConf {
/** Common prefix of all confs in SQLConf that affects RocksDB */
@@ -767,6 +771,10 @@ object RocksDBConf {
val ALLOW_FALLOCATE_CONF_KEY = "allowFAllocate"
private val ALLOW_FALLOCATE_CONF = SQLConfEntry(ALLOW_FALLOCATE_CONF_KEY, "true")
+ // Pass as compression type to RocksDB.
+ val COMPRESSION_KEY = "compression"
+ private val COMPRESSION_CONF = SQLConfEntry(COMPRESSION_KEY, "lz4")
+
def apply(storeConf: StateStoreConf): RocksDBConf = {
val sqlConfs = CaseInsensitiveMap[String](storeConf.sqlConfs)
val extraConfs = CaseInsensitiveMap[String](storeConf.extraOptions)
@@ -826,6 +834,14 @@ object RocksDBConf {
}
}
+ def getStringConf(conf: ConfEntry): String = {
+ Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toString } getOrElse {
+ throw new IllegalArgumentException(
+ s"Invalid value for '${conf.fullName}', must be a string"
+ )
+ }
+ }
+
RocksDBConf(
storeConf.minVersionsToRetain,
storeConf.minDeltasForSnapshot,
@@ -845,7 +861,8 @@ object RocksDBConf {
getRatioConf(WRITE_BUFFER_CACHE_RATIO_CONF),
getRatioConf(HIGH_PRIORITY_POOL_RATIO_CONF),
storeConf.compressionCodec,
- getBooleanConf(ALLOW_FALLOCATE_CONF))
+ getBooleanConf(ALLOW_FALLOCATE_CONF),
+ getStringConf(COMPRESSION_CONF))
}
def apply(): RocksDBConf = apply(new StateStoreConf())
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
index 4ce344d4e73a3..a6e65825a5bca 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala
@@ -87,6 +87,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".maxWriteBufferNumber", "3"),
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".writeBufferSizeMB", "16"),
(RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".allowFAllocate", "false"),
+ (RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".compression", "zstd"),
(SQLConf.STATE_STORE_ROCKSDB_FORMAT_VERSION.key, "4")
)
testConfs.foreach { case (k, v) => spark.conf.set(k, v) }
@@ -117,6 +118,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
assert(rocksDBConfInTask.maxWriteBufferNumber == 3)
assert(rocksDBConfInTask.writeBufferSizeMB == 16L)
assert(rocksDBConfInTask.allowFAllocate == false)
+ assert(rocksDBConfInTask.compression == "zstd")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
index 15d35bae700fe..ac50e55202919 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala
@@ -24,6 +24,7 @@ import scala.language.implicitConversions
import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
+import org.rocksdb.CompressionType
import org.scalactic.source.Position
import org.scalatest.Tag
@@ -373,6 +374,21 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}
}
+ test("RocksDB: compression conf") {
+ val remoteDir = Utils.createTempDir().toString
+ new File(remoteDir).delete() // to make sure that the directory gets created
+
+ val conf = RocksDBConf().copy(compression = "zstd")
+ withDB(remoteDir, conf = conf) { db =>
+ assert(db.columnFamilyOptions.compressionType() == CompressionType.ZSTD_COMPRESSION)
+ }
+
+ // Test the default is LZ4
+ withDB(remoteDir, conf = RocksDBConf().copy()) { db =>
+ assert(db.columnFamilyOptions.compressionType() == CompressionType.LZ4_COMPRESSION)
+ }
+ }
+
test("RocksDB: get, put, iterator, commit, load") {
def testOps(compactOnCommit: Boolean): Unit = {
val remoteDir = Utils.createTempDir().toString