Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SNAP-2366] row buffer fault-in, forced rollover, merge small batches #1046

Open
wants to merge 32 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2972c4d
Update store link
May 30, 2018
ff3cca0
fix for SNAP-2365
May 30, 2018
608bf3d
[SNAP-2366] row buffer fault-in, forced rollover, merge small batches
May 31, 2018
cd21c4d
update store link
May 31, 2018
00f6cda
Some optimizations and fixed few issues
Jun 4, 2018
a1dadd9
minor change
Jun 4, 2018
cf1eb84
Fixing precheckin failure in SNAP-2365 and and adding similar fix for…
Jun 5, 2018
215c6e3
Merge remote-tracking branch 'origin/master' into SNAP-2366
Jun 6, 2018
7f69db0
Merge remote-tracking branch 'origin/SNAP-2365' into SNAP-2366
Jun 6, 2018
9595f20
Merge remote-tracking branch 'origin/master' into SNAP-2366
Jun 8, 2018
88045eb
Merge remote-tracking branch 'origin/master' into SNAP-2366
Jun 28, 2018
c858273
coarse container locking to fix rollover/merge when running in parall…
Jun 28, 2018
0ea9eb9
temp
Jun 28, 2018
dbdbcb7
Merge remote-tracking branch 'origin/master' into SNAP-2366
Jul 10, 2018
c824cd6
minor formatting change
Jul 10, 2018
bf26063
fixes
Aug 2, 2018
f423108
fix a ClassCast
Aug 2, 2018
59588c0
Merge remote-tracking branch 'origin/master' into SNAP-2366
Aug 2, 2018
6ec25d0
fixing putInto which clears context prematurely
Aug 3, 2018
4905afc
fix COMMIT call
Aug 3, 2018
fa3f639
Merge remote-tracking branch 'origin/master' into SNAP-2366
Aug 3, 2018
778bb4b
update store link
Aug 3, 2018
647d243
updates and fixes
Aug 6, 2018
86f23d9
Merge remote-tracking branch 'origin/master' into SNAP-2366
Aug 10, 2018
94c45d6
fix build issues after master merge
Aug 10, 2018
4fb08bf
Merge remote-tracking branch 'origin/master' into SNAP-2366
Aug 31, 2018
e7960a9
minor cleanups
Aug 31, 2018
f2be757
Merge remote-tracking branch 'origin/master' into SNAP-2366
Nov 2, 2018
872bd8b
Merge remote-tracking branch 'origin/master' into SNAP-2366
Nov 2, 2018
ce7fac9
minor updates to tests
Nov 2, 2018
08a6956
Merge remote-tracking branch 'origin/master' into SNAP-2366
Dec 22, 2018
772c4af
Merge remote-tracking branch 'origin/master' into SNAP-2366
Dec 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
updates and fixes
  • Loading branch information
Sumedh Wale committed Aug 6, 2018
commit 647d243429687cfd15463122dd095b5815a507ff
Original file line number Diff line number Diff line change
@@ -260,13 +260,13 @@ class SplitSnappyClusterDUnitTest(s: String)
// added in SNAP-2012
StoreUtils.TEST_RANDOM_BUCKETID_ASSIGNMENT = true
try {
ColumnUpdateDeleteTests.testBasicUpdate(session)
ColumnUpdateDeleteTests.testDeltaStats(session)
ColumnUpdateDeleteTests.testBasicDelete(session)
ColumnUpdateDeleteTests.testSNAP1925(session)
ColumnUpdateDeleteTests.testSNAP1926(session)
ColumnUpdateDeleteTests.testConcurrentOps(session)
ColumnUpdateDeleteTests.testSNAP2124(session, checkPruning = true)
ColumnUpdateDeleteTests.testBasicUpdate(session, redundancy = 1)
ColumnUpdateDeleteTests.testDeltaStats(session, redundancy = 1)
ColumnUpdateDeleteTests.testBasicDelete(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP1925(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP1926(session, redundancy = 1)
ColumnUpdateDeleteTests.testConcurrentOps(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP2124(session, checkPruning = true, redundancy = 1)
} finally {
StoreUtils.TEST_RANDOM_BUCKETID_ASSIGNMENT = false
}
Original file line number Diff line number Diff line change
@@ -205,13 +205,13 @@ trait SplitClusterDUnitTestBase extends Logging {
// using random bucket assignment for cases like SNAP-2175
StoreUtils.TEST_RANDOM_BUCKETID_ASSIGNMENT = true
try {
ColumnUpdateDeleteTests.testBasicUpdate(session)
ColumnUpdateDeleteTests.testDeltaStats(session)
ColumnUpdateDeleteTests.testBasicDelete(session)
ColumnUpdateDeleteTests.testSNAP1925(session)
ColumnUpdateDeleteTests.testSNAP1926(session)
ColumnUpdateDeleteTests.testConcurrentOps(session)
ColumnUpdateDeleteTests.testSNAP2124(session, checkPruning = true)
ColumnUpdateDeleteTests.testBasicUpdate(session, redundancy = 1)
ColumnUpdateDeleteTests.testDeltaStats(session, redundancy = 1)
ColumnUpdateDeleteTests.testBasicDelete(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP1925(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP1926(session, redundancy = 1)
ColumnUpdateDeleteTests.testConcurrentOps(session, redundancy = 1)
ColumnUpdateDeleteTests.testSNAP2124(session, checkPruning = true, redundancy = 1)
} finally {
StoreUtils.TEST_RANDOM_BUCKETID_ASSIGNMENT = false
}
67 changes: 63 additions & 4 deletions core/src/main/scala/io/snappydata/functions.scala
Original file line number Diff line number Diff line change
@@ -31,11 +31,70 @@ import org.apache.spark.unsafe.types.UTF8String
*/
object SnappyDataFunctions {

val usageStr = "_FUNC_() - Returns the unique distributed member" +
" ID of the server containing the row."

def registerSnappyFunctions(functionRegistry: FunctionRegistry): Unit = {
val info = new ExpressionInfo(DSID.getClass.getCanonicalName, null, "DSID", usageStr, "")
var usageStr = ""
var extendedStr = ""
var info: ExpressionInfo = null

// below are in-built operators additionally handled in snappydata over spark
// which are listed so they can appear in describe function

// --- BEGIN OPERATORS ---

usageStr = "expr1 _FUNC_ expr2 - Bitwise left shift `expr1` by `expr2`."
extendedStr = """
Examples:
> SELECT 15 _FUNC_ 2;
60
"""
info = new ExpressionInfo("", null, "<<", usageStr, extendedStr)

usageStr = "expr1 _FUNC_ expr2 - Bitwise arithmetic right shift `expr1` by `expr2`."
extendedStr = """
Examples:
> SELECT 15 _FUNC_ 2;
3
> SELECT -15 _FUNC_ 2;
-4
"""
info = new ExpressionInfo("", null, ">>", usageStr, extendedStr)

usageStr = "expr1 _FUNC_ expr2 - Bitwise logical right shift `expr1` by `expr2`."
extendedStr = """
Examples:
> SELECT 15 _FUNC_ 2;
3
> SELECT -15 _FUNC_ 2;
1073741820
"""
info = new ExpressionInfo("", null, ">>>", usageStr, extendedStr)

usageStr = "str1 || str2 - Returns the concatenation of str1 and str2."
extendedStr = """
Examples:
> SELECT 'Spark' _FUNC_ 'SQL';
SparkSQL
"""
info = new ExpressionInfo("", null, "||", usageStr, extendedStr)

// --- END OPERATORS ---

usageStr = "_FUNC_() - Returns the unique distributed member " +
"ID of the server containing the current row being fetched."
extendedStr = """
Examples:
> SELECT _FUNC_, ID FROM RANGE(1, 10);
127.0.0.1(25167)<v2>:16171|1
127.0.0.1(25167)<v2>:16171|2
127.0.0.1(25167)<v2>:16171|3
127.0.0.1(25167)<v2>:16171|4
127.0.0.1(25078)<v1>:13152|5
127.0.0.1(25078)<v1>:13152|6
127.0.0.1(25078)<v1>:13152|7
127.0.0.1(25078)<v1>:13152|8
127.0.0.1(25167)<v2>:16171|9
"""
info = new ExpressionInfo(DSID.getClass.getCanonicalName, null, "DSID", usageStr, extendedStr)
functionRegistry.registerFunction("DSID", info, _ => DSID())
}
}
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/SnappyDDLParser.scala
Original file line number Diff line number Diff line change
@@ -549,6 +549,19 @@ abstract class SnappyDDLParser(session: SparkSession)
.asInstanceOf[Option[Boolean]].isDefined, isFormatted = false))
}

protected def describeFunction: Rule1[LogicalPlan] = rule {
DESCRIBE ~ (EXTENDED ~ push(true)).? ~ (functionIdentifier | stringLiteral |
capture("==" | "=" | "!=" | "<>" | ">=" | ">>" | ">>>" | ">" | "<=>" | "<=" | "<<" | "<" |
"+" | "-" | "*" | "/" | "%" | "~" | "&" | "||" | "|" | "~" |
OR | AND | IN | NOT)) ~> { (extended: Any, ident: Any) =>
val functionIdent = ident match {
case f: FunctionIdentifier => f
case s: String => FunctionIdentifier(s, database = None)
}
DescribeFunctionCommand(functionIdent, extended.asInstanceOf[Option[Boolean]].isDefined)
}
}

protected def refreshTable: Rule1[LogicalPlan] = rule {
REFRESH ~ TABLE ~ tableIdentifier ~> RefreshTable
}
@@ -810,7 +823,7 @@ case class DeployCommand(
Misc.checkIfCacheClosing(ex)
if (restart) {
logWarning(s"Following mvn coordinate" +
s" could not be resolved during restart: ${coordinates}", ex)
s" could not be resolved during restart: $coordinates", ex)
if (lang.Boolean.parseBoolean(System.getProperty("FAIL_ON_JAR_UNAVAILABILITY", "true"))) {
throw ex
}
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/SnappyParser.scala
Original file line number Diff line number Diff line change
@@ -373,7 +373,9 @@ class SnappyParser(session: SnappySession)

protected final def comparisonExpression: Rule1[Expression] = rule {
termExpression ~ (
'=' ~ ws ~ termExpression ~> EqualTo |
'=' ~ (
'=' ~ ws ~ termExpression ~> EqualTo |
ws ~ termExpression ~> EqualTo) |
'>' ~ (
'=' ~ ws ~ termExpression ~> GreaterThanOrEqual |
'>' ~ (
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/SnappySession.scala
Original file line number Diff line number Diff line change
@@ -334,7 +334,7 @@ class SnappySession(_sc: SparkContext) extends SparkSession(_sc) {
case Some(context) => context.persist = persist; context.objects
}
// use a unique lock owner
val lockOwner = s"MUTABLE_OP_OWNER_$id.${System.nanoTime()}"
val lockOwner = s"READ_${SnappySession.MUTABLE_OWNER_PREFIX}_$id.${System.nanoTime()}"
opContext.put(SnappySession.MUTABLE_PLAN_TABLE, qualifiedTableName)
opContext.put(SnappySession.MUTABLE_PLAN_OWNER, lockOwner)
}
@@ -1931,6 +1931,9 @@ object SnappySession extends Logging {
/** internal property to indicate update/delete/putInto execution and lock owner for the same */
private[sql] val MUTABLE_PLAN_OWNER = "snappydata.internal.mutablePlanOwner"

/** a unique UUID of the node for mutability lock ownership */
private[sql] lazy val MUTABLE_OWNER_PREFIX = java.util.UUID.randomUUID().toString

private[sql] var tokenize: Boolean = _

lazy val isEnterpriseEdition: Boolean = {
Original file line number Diff line number Diff line change
@@ -30,10 +30,11 @@ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier}
import org.apache.spark.sql.collection.Utils
import org.apache.spark.sql.execution.columnar.impl.{BaseColumnFormatRelation, IndexColumnFormatRelation}
import org.apache.spark.sql.execution.columnar.{ColumnTableScan, ConnectionType}
import org.apache.spark.sql.execution.columnar.{ColumnDeleteExec, ColumnPutIntoExec, ColumnTableScan, ColumnUpdateExec, ConnectionType}
import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchange}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetricInfo, SQLMetrics}
import org.apache.spark.sql.execution.row.{RowFormatRelation, RowTableScan}
import org.apache.spark.sql.internal.ColumnTableBulkOps
import org.apache.spark.sql.sources.{BaseRelation, PrunedUnsafeFilteredScan, SamplingRelation}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, CachedDataFrame, SnappySession}
@@ -235,7 +236,33 @@ case class ExecutePlan(child: SparkPlan, preAction: () => Unit = () => ())
Array.concat(rows: _*)
}

private val (mutableTable, lockOwner) = {
val session = sqlContext.sparkSession.asInstanceOf[SnappySession]
session.getMutablePlanTable match {
case null => (null, null)
case table => (table, session.getMutablePlanOwner)
}
}

protected[sql] lazy val sideEffectResult: Array[InternalRow] = {
try {
getSideEffectResult
} finally {
// release locks at the end of update/delete/putInto
if ((mutableTable ne null) && (lockOwner ne null)) {
ColumnTableBulkOps.releaseBucketMaintenanceLocks(mutableTable, lockOwner, () =>
SnappySession.getExecutedPlan(child) match {
case (u: ColumnUpdateExec, _) => u.connProps
case (d: ColumnDeleteExec, _) => d.connProps
case (p: ColumnPutIntoExec, _) => p.updatePlan.asInstanceOf[ColumnUpdateExec].connProps
case _ => throw new IllegalStateException(
s"Unexpected plan for ${child.getClass.getName}: $child")
}, sparkContext)
}
}
}

private def getSideEffectResult: Array[InternalRow] = {
val session = sqlContext.sparkSession.asInstanceOf[SnappySession]
val sc = session.sparkContext
val key = session.currentKey
Original file line number Diff line number Diff line change
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.columnar

import java.sql.Connection

import org.apache.spark.sql.SnappySession
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.collection.Utils
import org.apache.spark.sql.execution.columnar.impl.{JDBCSourceAsColumnarStore, SnapshotConnectionListener}
@@ -40,10 +39,7 @@ trait ColumnExec extends RowExec {

override protected def connectionCodes(ctx: CodegenContext): (String, String, String) = {
val connectionClass = classOf[Connection].getName
val session = sqlContext.sparkSession.asInstanceOf[SnappySession]
val externalStoreTerm = ctx.addReferenceObj("externalStore", externalStore)
val updateOwner = ctx.addReferenceObj("updateOwner",
session.getMutablePlanOwner, classOf[String].getName)
val listenerClass = classOf[SnapshotConnectionListener].getName
val storeClass = classOf[JDBCSourceAsColumnarStore].getName
taskListener = ctx.freshName("taskListener")
@@ -55,8 +51,7 @@ trait ColumnExec extends RowExec {

val initCode =
s"""
|$taskListener = new $listenerClass(($storeClass)$externalStoreTerm,
| $delayRollover, $updateOwner);
|$taskListener = new $listenerClass(($storeClass)$externalStoreTerm, $delayRollover);
|$connTerm = $taskListener.getConn();
|if ($getContext() != null) {
| $getContext().addTaskCompletionListener($taskListener);
Original file line number Diff line number Diff line change
@@ -588,14 +588,14 @@ case class ColumnInsertExec(child: SparkPlan, partitionColumns: Seq[String],
ctx.addNewFunction(commitSnapshotTx,
s"""
|private final void $commitSnapshotTx(String $txId, scala.Option $conn) {
| $externalStoreTerm.commitTx($txId, false, null, -1, $conn);
| $externalStoreTerm.commitTx($txId, false, $conn);
|}
""".stripMargin)
rollbackSnapshotTx = ctx.freshName("rollbackSnapshotTx")
ctx.addNewFunction(rollbackSnapshotTx,
s"""
|private final void $rollbackSnapshotTx(String $txId, scala.Option $conn) {
| $externalStoreTerm.rollbackTx($txId, null, -1, $conn);
| $externalStoreTerm.rollbackTx($txId, $conn);
|}
""".stripMargin)
closeConnection = ctx.freshName("closeConnection")
@@ -725,14 +725,14 @@ case class ColumnInsertExec(child: SparkPlan, partitionColumns: Seq[String],
ctx.addNewFunction(commitSnapshotTx,
s"""
|private final void $commitSnapshotTx(String $txId, scala.Option $conn) {
| $externalStoreTerm.commitTx($txId, false, null, -1, $conn);
| $externalStoreTerm.commitTx($txId, false, $conn);
|}
""".stripMargin)
rollbackSnapshotTx = ctx.freshName("rollbackSnapshotTx")
ctx.addNewFunction(rollbackSnapshotTx,
s"""
|private final void $rollbackSnapshotTx(String $txId, scala.Option $conn) {
| $externalStoreTerm.rollbackTx($txId, null, -1, $conn);
| $externalStoreTerm.rollbackTx($txId, $conn);
|}
""".stripMargin)
closeConnection = ctx.freshName("closeConnection")
Loading