Skip to content

Commit

Permalink
[SC-5492] Fix drop table command in ACL enabled Spark
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?
Drop table commands (and probably other create/drop commands) currently fail with a table does not exists exception when using Thrift with ACLs enabled.

This bug is caused by the following factors:

1. Thrift always executes an action on a dataset by calling `collect()`.
2. A `Dataset` for a command is executed eagerly. As soon as we create a `Dataset` that contains a `DropTableCommand`, the given table is dropped.
3. When you execute an action on a dataset. The `Dataset` creates a new `Dataframe` to track the query execution. The created `Dataset` will re-check analysis, the ACL `CheckPermissions` rule is triggered in this case, which fails because it cannot find the table to be dropped (it has already been dropped when the `Dataset` was eagerly executed).

This PR fixes this issue by modifying the `Dataset` actions; they will not spin off a new `Dataframe` anymore, the `queryExecution` is used directly when evaluating an action. This also gets rid of some code duplication for `Dataset` action evaluation by merging the typed and untyped code paths. The changes to `Dataset` will put in Apache Spark, I have created https://issues.apache.org/jira/browse/SPARK-19070 to track this.

## How was this patch tested?
I have added a regression test to `HiveThriftServer2Suites` and I have expanded the `base` scenario in the thrift ACL end to end tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes apache#160 from hvanhovell/SC-5492.
  • Loading branch information
hvanhovell committed Jan 6, 2017
1 parent c836d98 commit 56dafc3
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 142 deletions.
76 changes: 26 additions & 50 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand, GlobalTempView, LocalTempView}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.python.EvaluatePython
Expand Down Expand Up @@ -2109,9 +2109,7 @@ class Dataset[T] private[sql](
* @group action
* @since 1.6.0
*/
def head(n: Int): Array[T] = withTypedCallback("head", limit(n)) { df =>
df.collect(needCallback = false)
}
def head(n: Int): Array[T] = withAction("head", limit(n).queryExecution)(collectFromPlan)

/**
* Returns the first row.
Expand Down Expand Up @@ -2338,7 +2336,7 @@ class Dataset[T] private[sql](
def takeAsList(n: Int): java.util.List[T] = java.util.Arrays.asList(take(n) : _*)

/**
* Returns an array that contains all of [[Row]]s in this Dataset.
* Returns an array that contains all rows in this Dataset.
*
* Running collect requires moving all the data into the application's driver process, and
* doing so on a very large dataset can crash the driver process with OutOfMemoryError.
Expand All @@ -2348,38 +2346,24 @@ class Dataset[T] private[sql](
* @group action
* @since 1.6.0
*/
def collect(): Array[T] = collect(needCallback = true)
def collect(): Array[T] = withAction("collect", queryExecution)(collectFromPlan)

/**
* Returns a Java list that contains all of [[Row]]s in this Dataset.
* Returns a Java list that contains all rows in this Dataset.
*
* Running collect requires moving all the data into the application's driver process, and
* doing so on a very large dataset can crash the driver process with OutOfMemoryError.
*
* @group action
* @since 1.6.0
*/
def collectAsList(): java.util.List[T] = withCallback("collectAsList", toDF()) { _ =>
withNewExecutionId {
val values = queryExecution.executedPlan.executeCollect().map(boundEnc.fromRow)
java.util.Arrays.asList(values : _*)
}
}

private def collect(needCallback: Boolean): Array[T] = {
def execute(): Array[T] = withNewExecutionId {
queryExecution.executedPlan.executeCollect().map(boundEnc.fromRow)
}

if (needCallback) {
withCallback("collect", toDF())(_ => execute())
} else {
execute()
}
def collectAsList(): java.util.List[T] = withAction("collectAsList", queryExecution) { plan =>
val values = collectFromPlan(plan)
java.util.Arrays.asList(values : _*)
}

/**
* Return an iterator that contains all of [[Row]]s in this Dataset.
* Return an iterator that contains all rows in this Dataset.
*
* The iterator will consume as much memory as the largest partition in this Dataset.
*
Expand All @@ -2390,9 +2374,9 @@ class Dataset[T] private[sql](
* @group action
* @since 2.0.0
*/
def toLocalIterator(): java.util.Iterator[T] = withCallback("toLocalIterator", toDF()) { _ =>
withNewExecutionId {
queryExecution.executedPlan.executeToIterator().map(boundEnc.fromRow).asJava
def toLocalIterator(): java.util.Iterator[T] = {
withAction("toLocalIterator", queryExecution) { plan =>
plan.executeToIterator().map(boundEnc.fromRow).asJava
}
}

Expand All @@ -2401,8 +2385,8 @@ class Dataset[T] private[sql](
* @group action
* @since 1.6.0
*/
def count(): Long = withCallback("count", groupBy().count()) { df =>
df.collect(needCallback = false).head.getLong(0)
def count(): Long = withAction("count", groupBy().count().queryExecution) { plan =>
plan.executeCollect().head.getLong(0)
}

/**
Expand Down Expand Up @@ -2769,38 +2753,30 @@ class Dataset[T] private[sql](
* Wrap a Dataset action to track the QueryExecution and time cost, then report to the
* user-registered callback functions.
*/
private def withCallback[U](name: String, df: DataFrame)(action: DataFrame => U) = {
private def withAction[U](name: String, qe: QueryExecution)(action: SparkPlan => U) = {
try {
df.queryExecution.executedPlan.foreach { plan =>
qe.executedPlan.foreach { plan =>
plan.resetMetrics()
}
val start = System.nanoTime()
val result = action(df)
val result = SQLExecution.withNewExecutionId(sparkSession, qe) {
action(qe.executedPlan)
}
val end = System.nanoTime()
sparkSession.listenerManager.onSuccess(name, df.queryExecution, end - start)
sparkSession.listenerManager.onSuccess(name, qe, end - start)
result
} catch {
case e: Exception =>
sparkSession.listenerManager.onFailure(name, df.queryExecution, e)
sparkSession.listenerManager.onFailure(name, qe, e)
throw e
}
}

private def withTypedCallback[A, B](name: String, ds: Dataset[A])(action: Dataset[A] => B) = {
try {
ds.queryExecution.executedPlan.foreach { plan =>
plan.resetMetrics()
}
val start = System.nanoTime()
val result = action(ds)
val end = System.nanoTime()
sparkSession.listenerManager.onSuccess(name, ds.queryExecution, end - start)
result
} catch {
case e: Exception =>
sparkSession.listenerManager.onFailure(name, ds.queryExecution, e)
throw e
}
/**
* Collect all elements from a spark plan.
*/
private def collectFromPlan(plan: SparkPlan): Array[T] = {
plan.executeCollect().map(boundEnc.fromRow)
}

private def sortInternal(global: Boolean, sortExprs: Seq[Column]): Dataset[T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,20 @@ show grant usr1 on db1.vw1;
show tables in db1;
select * from db1.vw1;

-- Revoke rights
@super;
revoke all privileges on db1.vw1 from usr1;
revoke all privileges on database db1 from usr1;

-- Fail selecting data for usr1.
@usr1;
show grant usr1 on db1.vw1;
show tables in db1;
select * from db1.vw1;

-- Cleanup
@super;
drop database db1 cascade;
drop view db1.vw1;
drop database db1;
show databases;
msck repair database __all__ privileges;
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by ThriftEndToEndAclTestSuite
-- Number of queries: 12
-- Number of queries: 18


-- !query 0
Expand Down Expand Up @@ -111,7 +111,7 @@ struct<id:bigint>


-- !query 9
drop database db1 cascade
revoke all privileges on db1.vw1 from usr1
-- !query 9 token
super
-- !query 9 schema
Expand All @@ -121,20 +121,80 @@ struct<Result:string>


-- !query 10
show databases
revoke all privileges on database db1 from usr1
-- !query 10 token
super
-- !query 10 schema
struct<databaseName:string>
struct<Result:string>
-- !query 10 output
[default]



-- !query 11
msck repair database __all__ privileges
show grant usr1 on db1.vw1
-- !query 11 token
super
usr1
-- !query 11 schema
struct<Result:string>
struct<Principal:string,ActionType:string,ObjectType:string,ObjectKey:string>
-- !query 11 output



-- !query 12
show tables in db1
-- !query 12 token
usr1
-- !query 12 schema
struct<>
-- !query 12 output
java.lang.SecurityException: Principal is not authorized to execute the given query


-- !query 13
select * from db1.vw1
-- !query 13 token
usr1
-- !query 13 schema
struct<>
-- !query 13 output
java.lang.SecurityException: Principal is not authorized to execute the given query


-- !query 14
drop view db1.vw1
-- !query 14 token
super
-- !query 14 schema
struct<Result:string>
-- !query 14 output



-- !query 15
drop database db1
-- !query 15 token
super
-- !query 15 schema
struct<Result:string>
-- !query 15 output



-- !query 16
show databases
-- !query 16 token
super
-- !query 16 schema
struct<databaseName:string>
-- !query 16 output
[default]


-- !query 17
msck repair database __all__ privileges
-- !query 17 token
super
-- !query 17 schema
struct<Result:string>
-- !query 17 output

Original file line number Diff line number Diff line change
Expand Up @@ -770,17 +770,11 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
val tempLog4jConf = Utils.createTempDir().getCanonicalPath

Files.write(
"""log4j.rootCategory=DEBUG, console, file
"""log4j.rootCategory=DEBUG, console
|log4j.appender.console=org.apache.log4j.ConsoleAppender
|log4j.appender.console.target=System.err
|log4j.appender.console.layout=org.apache.log4j.PatternLayout
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
|log4j.appender.file=org.apache.log4j.FileAppender
|log4j.appender.file.append=false
|log4j.appender.file.file=/Users/hvanhovell/thrift.log
|log4j.appender.file.threshold=DEBUG
|log4j.appender.file.layout=org.apache.log4j.PatternLayout
|log4j.appender.file.layout.ConversionPattern=%-5p %c: %m%n
""".stripMargin,
new File(s"$tempLog4jConf/log4j.properties"),
StandardCharsets.UTF_8)
Expand Down
Loading

0 comments on commit 56dafc3

Please sign in to comment.