From e0062e596f9d9d635f12629e5b6def4939e82302 Mon Sep 17 00:00:00 2001 From: Vivek Bhaskar Date: Wed, 16 Nov 2016 12:25:17 +0530 Subject: [PATCH] Enabling redundancy for sample table being same as its base table. (continue 1) --- .../apache/spark/sql/internal/SnappySessionState.scala | 3 +++ .../scala/org/apache/spark/sql/store/StoreInitRDD.scala | 9 +++++++-- store | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionState.scala b/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionState.scala index 05b584bbc7..ac316b5d65 100644 --- a/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionState.scala +++ b/core/src/main/scala/org/apache/spark/sql/internal/SnappySessionState.scala @@ -180,6 +180,9 @@ class SnappySessionState(snappySession: SnappySession) def getTablePartitions(region: CacheDistributionAdvisee): Array[Partition] = StoreUtils.getPartitionsReplicatedTable(snappySession, region) + + def getStratumReservoir(): Class[_] = null + def getStratumCache(): Class[_] = null } private[sql] class SnappyConf(@transient val session: SnappySession) diff --git a/core/src/main/scala/org/apache/spark/sql/store/StoreInitRDD.scala b/core/src/main/scala/org/apache/spark/sql/store/StoreInitRDD.scala index 360fe62d18..948cbb86df 100644 --- a/core/src/main/scala/org/apache/spark/sql/store/StoreInitRDD.scala +++ b/core/src/main/scala/org/apache/spark/sql/store/StoreInitRDD.scala @@ -25,7 +25,7 @@ import com.pivotal.gemfirexd.internal.engine.Misc import io.snappydata.Constant import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SnappyContext, SQLContext} import org.apache.spark.sql.collection.{ExecutorLocalPartition, Utils} import org.apache.spark.sql.execution.columnar.impl.StoreCallbacksImpl.ExecutorCatalogEntry import org.apache.spark.sql.execution.columnar.ExternalStoreUtils @@ -48,6 +48,11 @@ class StoreInitRDD(@transient private val sqlContext: SQLContext, extends RDD[(InternalDistributedMember, BlockManagerId)]( sqlContext.sparkContext, Nil) { + val scClass = sqlContext match { + case s: SnappyContext => s.sessionState.getStratumCache() + case _ => null + } + val isLoner = Utils.isLoner(sqlContext.sparkContext) val userCompression = sqlContext.conf.useCompression val columnBatchSize = sqlContext.conf.columnBatchSize @@ -59,7 +64,7 @@ class StoreInitRDD(@transient private val sqlContext: SQLContext, override def compute(split: Partition, context: TaskContext): Iterator[(InternalDistributedMember, BlockManagerId)] = { - DSFIDFactory.aqpRegister("StoreInitRDD") + DSFIDFactory.aqpRegister(scClass) // TODO:Suranjan Hackish as we have to register this store at each // executor, for storing the CachedBatch we are creating diff --git a/store b/store index 8349e7d7ec..c68ba9a0a3 160000 --- a/store +++ b/store @@ -1 +1 @@ -Subproject commit 8349e7d7ecd951d77c13049eb7a456a789065eba +Subproject commit c68ba9a0a38d47d4eb0b6f5986822277f721d66a