Skip to content

Commit

Permalink
Enabling redundancy for sample table being same as its base table. (c…
Browse files Browse the repository at this point in the history
…ontinue 1)
  • Loading branch information
Vivek Bhaskar committed Nov 16, 2016
1 parent 6d99192 commit e0062e5
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ class SnappySessionState(snappySession: SnappySession)

def getTablePartitions(region: CacheDistributionAdvisee): Array[Partition] =
StoreUtils.getPartitionsReplicatedTable(snappySession, region)

def getStratumReservoir(): Class[_] = null
def getStratumCache(): Class[_] = null
}

private[sql] class SnappyConf(@transient val session: SnappySession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import com.pivotal.gemfirexd.internal.engine.Misc
import io.snappydata.Constant

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SnappyContext, SQLContext}
import org.apache.spark.sql.collection.{ExecutorLocalPartition, Utils}
import org.apache.spark.sql.execution.columnar.impl.StoreCallbacksImpl.ExecutorCatalogEntry
import org.apache.spark.sql.execution.columnar.ExternalStoreUtils
Expand All @@ -48,6 +48,11 @@ class StoreInitRDD(@transient private val sqlContext: SQLContext,
extends RDD[(InternalDistributedMember, BlockManagerId)](
sqlContext.sparkContext, Nil) {

val scClass = sqlContext match {
case s: SnappyContext => s.sessionState.getStratumCache()
case _ => null
}

val isLoner = Utils.isLoner(sqlContext.sparkContext)
val userCompression = sqlContext.conf.useCompression
val columnBatchSize = sqlContext.conf.columnBatchSize
Expand All @@ -59,7 +64,7 @@ class StoreInitRDD(@transient private val sqlContext: SQLContext,
override def compute(split: Partition,
context: TaskContext): Iterator[(InternalDistributedMember,
BlockManagerId)] = {
DSFIDFactory.aqpRegister("StoreInitRDD")
DSFIDFactory.aqpRegister(scClass)

// TODO:Suranjan Hackish as we have to register this store at each
// executor, for storing the CachedBatch we are creating
Expand Down
2 changes: 1 addition & 1 deletion store
Submodule store updated from 8349e7 to c68ba9

0 comments on commit e0062e5

Please sign in to comment.