From 7bb9a521f35eb19576c6cc2da3fd385910270e46 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 13 May 2014 23:24:51 -0700 Subject: [PATCH 1/3] Revert "[SPARK-1784] Add a new partitioner to allow specifying # of keys per partition" This reverts commit 92cebada09a7e5a00ab48bcb350a9462949c33eb. --- .../scala/org/apache/spark/Partitioner.scala | 61 ------------------- .../org/apache/spark/PartitioningSuite.scala | 34 ----------- 2 files changed, 95 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 62747960618a9..9155159cf6aeb 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -156,64 +156,3 @@ class RangePartitioner[K : Ordering : ClassTag, V]( false } } - -/** - * A [[org.apache.spark.Partitioner]] that partitions records into specified bounds - * Default value is 1000. Once all partitions have bounds elements, the partitioner - * allocates 1 element per partition so eventually the smaller partitions are at most - * off by 1 key compared to the larger partitions. - */ -class BoundaryPartitioner[K : Ordering : ClassTag, V]( - partitions: Int, - @transient rdd: RDD[_ <: Product2[K,V]], - private val boundary: Int = 1000) - extends Partitioner { - - // this array keeps track of keys assigned to a partition - // counts[0] refers to # of keys in partition 0 and so on - private val counts: Array[Int] = { - new Array[Int](numPartitions) - } - - def numPartitions = math.abs(partitions) - - /* - * Ideally, this should've been calculated based on # partitions and total keys - * But we are not calling count on RDD here to avoid calling an action. - * User has the flexibility of calling count and passing in any appropriate boundary - */ - def keysPerPartition = boundary - - var currPartition = 0 - - /* - * Pick current partition for the key until we hit the bound for keys / partition, - * start allocating to next partition at that time. - * - * NOTE: In case where we have lets say 2000 keys and user says 3 partitions with 500 - * passed in as boundary, the first 500 will goto P1, 501-1000 go to P2, 1001-1500 go to P3, - * after that, next keys go to one partition at a time. So 1501 goes to P1, 1502 goes to P2, - * 1503 goes to P3 and so on. - */ - def getPartition(key: Any): Int = { - val partition = currPartition - counts(partition) = counts(partition) + 1 - /* - * Since we are filling up a partition before moving to next one (this helps in maintaining - * order of keys, in certain cases, it is possible to end up with empty partitions, like - * 3 partitions, 500 keys / partition and if rdd has 700 keys, 1 partition will be entirely - * empty. - */ - if(counts(currPartition) >= keysPerPartition) - currPartition = (currPartition + 1) % numPartitions - partition - } - - override def equals(other: Any): Boolean = other match { - case r: BoundaryPartitioner[_,_] => - (r.counts.sameElements(counts) && r.boundary == boundary - && r.currPartition == currPartition) - case _ => - false - } -} diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index 7d40395803f02..7c30626a0c421 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -66,40 +66,6 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet assert(descendingP4 != p4) } - test("BoundaryPartitioner equality") { - // Make an RDD where all the elements are the same so that the partition range bounds - // are deterministically all the same. - val rdd = sc.parallelize(1.to(4000)).map(x => (x, x)) - - val p2 = new BoundaryPartitioner(2, rdd, 1000) - val p4 = new BoundaryPartitioner(4, rdd, 1000) - val anotherP4 = new BoundaryPartitioner(4, rdd) - - assert(p2 === p2) - assert(p4 === p4) - assert(p2 != p4) - assert(p4 != p2) - assert(p4 === anotherP4) - assert(anotherP4 === p4) - } - - test("BoundaryPartitioner getPartition") { - val rdd = sc.parallelize(1.to(2000)).map(x => (x, x)) - val partitioner = new BoundaryPartitioner(4, rdd, 500) - 1.to(2000).map { element => { - val partition = partitioner.getPartition(element) - if (element <= 500) { - assert(partition === 0) - } else if (element > 501 && element <= 1000) { - assert(partition === 1) - } else if (element > 1001 && element <= 1500) { - assert(partition === 2) - } else if (element > 1501 && element <= 2000) { - assert(partition === 3) - } - }} - } - test("RangePartitioner getPartition") { val rdd = sc.parallelize(1.to(2000)).map(x => (x, x)) // We have different behaviour of getPartition for partitions with less than 1000 and more than From 6ce0884446d3571fd6e9d967a080a59c657543b1 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 13 May 2014 23:27:22 -0700 Subject: [PATCH 2/3] [SQL] Improve column pruning. Fixed a bug that was preventing us from ever pruning beneath Joins. ## TPC-DS Q3 ### Before: ``` Aggregate false, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand_id#64 AS brand_id#0,i_brand#65 AS brand#1,SUM(PartialSum#79) AS sum_agg#2] Exchange (HashPartitioning [d_year#12:0,i_brand#65:1,i_brand_id#64:2], 150) Aggregate true, [d_year#12,i_brand#65,i_brand_id#64], [d_year#12,i_brand#65,i_brand_id#64,SUM(CAST(ss_ext_sales_price#49, DoubleType)) AS PartialSum#79] Project [d_year#12:6,i_brand#65:59,i_brand_id#64:58,ss_ext_sales_price#49:43] HashJoin [ss_item_sk#36], [i_item_sk#57], BuildRight Exchange (HashPartitioning [ss_item_sk#36:30], 150) HashJoin [d_date_sk#6], [ss_sold_date_sk#34], BuildRight Exchange (HashPartitioning [d_date_sk#6:0], 150) Filter (d_moy#14:8 = 12) HiveTableScan [d_date_sk#6,d_date_id#7,d_date#8,d_month_seq#9,d_week_seq#10,d_quarter_seq#11,d_year#12,d_dow#13,d_moy#14,d_dom#15,d_qoy#16,d_fy_year#17,d_fy_quarter_seq#18,d_fy_week_seq#19,d_day_name#20,d_quarter_name#21,d_holiday#22,d_weekend#23,d_following_holiday#24,d_first_dom#25,d_last_dom#26,d_same_day_ly#27,d_same_day_lq#28,d_current_day#29,d_current_week#30,d_current_month#31,d_current_quarter#32,d_current_year#33], (MetastoreRelation default, date_dim, Some(dt)), None Exchange (HashPartitioning [ss_sold_date_sk#34:0], 150) HiveTableScan [ss_sold_date_sk#34,ss_sold_time_sk#35,ss_item_sk#36,ss_customer_sk#37,ss_cdemo_sk#38,ss_hdemo_sk#39,ss_addr_sk#40,ss_store_sk#41,ss_promo_sk#42,ss_ticket_number#43,ss_quantity#44,ss_wholesale_cost#45,ss_list_price#46,ss_sales_price#47,ss_ext_discount_amt#48,ss_ext_sales_price#49,ss_ext_wholesale_cost#50,ss_ext_list_price#51,ss_ext_tax#52,ss_coupon_amt#53,ss_net_paid#54,ss_net_paid_inc_tax#55,ss_net_profit#56], (MetastoreRelation default, store_sales, None), None Exchange (HashPartitioning [i_item_sk#57:0], 150) Filter (i_manufact_id#70:13 = 436) HiveTableScan [i_item_sk#57,i_item_id#58,i_rec_start_date#59,i_rec_end_date#60,i_item_desc#61,i_current_price#62,i_wholesale_cost#63,i_brand_id#64,i_brand#65,i_class_id#66,i_class#67,i_category_id#68,i_category#69,i_manufact_id#70,i_manufact#71,i_size#72,i_formulation#73,i_color#74,i_units#75,i_container#76,i_manager_id#77,i_product_name#78], (MetastoreRelation default, item, None), None ``` ### After ``` Aggregate false, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand_id#224 AS brand_id#160,i_brand#225 AS brand#161,SUM(PartialSum#239) AS sum_agg#162] Exchange (HashPartitioning [d_year#172:0,i_brand#225:1,i_brand_id#224:2], 150) Aggregate true, [d_year#172,i_brand#225,i_brand_id#224], [d_year#172,i_brand#225,i_brand_id#224,SUM(CAST(ss_ext_sales_price#209, DoubleType)) AS PartialSum#239] Project [d_year#172:1,i_brand#225:5,i_brand_id#224:3,ss_ext_sales_price#209:0] HashJoin [ss_item_sk#196], [i_item_sk#217], BuildRight Exchange (HashPartitioning [ss_item_sk#196:2], 150) Project [ss_ext_sales_price#209:2,d_year#172:1,ss_item_sk#196:3] HashJoin [d_date_sk#166], [ss_sold_date_sk#194], BuildRight Exchange (HashPartitioning [d_date_sk#166:0], 150) Project [d_date_sk#166:0,d_year#172:1] Filter (d_moy#174:2 = 12) HiveTableScan [d_date_sk#166,d_year#172,d_moy#174], (MetastoreRelation default, date_dim, Some(dt)), None Exchange (HashPartitioning [ss_sold_date_sk#194:2], 150) HiveTableScan [ss_ext_sales_price#209,ss_item_sk#196,ss_sold_date_sk#194], (MetastoreRelation default, store_sales, None), None Exchange (HashPartitioning [i_item_sk#217:1], 150) Project [i_brand_id#224:0,i_item_sk#217:1,i_brand#225:2] Filter (i_manufact_id#230:3 = 436) HiveTableScan [i_brand_id#224,i_item_sk#217,i_brand#225,i_manufact_id#230], (MetastoreRelation default, item, None), None ``` Author: Michael Armbrust Closes #729 from marmbrus/fixPruning and squashes the following commits: 5feeff0 [Michael Armbrust] Improve column pruning. --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 3037d45cc6e35..406ffd6801e98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -25,13 +25,13 @@ import org.apache.spark.sql.catalyst.types._ object Optimizer extends RuleExecutor[LogicalPlan] { val batches = - Batch("ConstantFolding", Once, + Batch("ConstantFolding", FixedPoint(100), NullPropagation, ConstantFolding, BooleanSimplification, SimplifyFilters, SimplifyCasts) :: - Batch("Filter Pushdown", Once, + Batch("Filter Pushdown", FixedPoint(100), CombineFilters, PushPredicateThroughProject, PushPredicateThroughInnerJoin, @@ -49,17 +49,19 @@ object Optimizer extends RuleExecutor[LogicalPlan] { */ object ColumnPruning extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Eliminate attributes that are not needed to calculate the specified aggregates. case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty => - // Project away references that are not needed to calculate the required aggregates. a.copy(child = Project(a.references.toSeq, child)) + // Eliminate unneeded attributes from either side of a Join. case Project(projectList, Join(left, right, joinType, condition)) => // Collect the list of off references required either above or to evaluate the condition. val allReferences: Set[Attribute] = projectList.flatMap(_.references).toSet ++ condition.map(_.references).getOrElse(Set.empty) - /** Applies a projection when the child is producing unnecessary attributes */ + + /** Applies a projection only when the child is producing unnecessary attributes */ def prunedChild(c: LogicalPlan) = - if ((allReferences.filter(c.outputSet.contains) -- c.outputSet).nonEmpty) { + if ((c.outputSet -- allReferences.filter(c.outputSet.contains)).nonEmpty) { Project(allReferences.filter(c.outputSet.contains).toSeq, c) } else { c @@ -67,6 +69,7 @@ object ColumnPruning extends Rule[LogicalPlan] { Project(projectList, Join(prunedChild(left), prunedChild(right), joinType, condition)) + // Combine adjacent Projects. case Project(projectList1, Project(projectList2, child)) => // Create a map of Aliases to their values from the child projection. // e.g., 'SELECT ... FROM (SELECT a + b AS c, d ...)' produces Map(c -> Alias(a + b, c)). @@ -83,6 +86,9 @@ object ColumnPruning extends Rule[LogicalPlan] { }).asInstanceOf[Seq[NamedExpression]] Project(substitutedProjection, child) + + // Eliminate no-op Projects + case Project(projectList, child) if(child.output == projectList) => child } } From b22952fa1f21c0b93208846b5e1941a9d2578c6f Mon Sep 17 00:00:00 2001 From: Koert Kuipers Date: Wed, 14 May 2014 00:10:12 -0700 Subject: [PATCH 3/3] SPARK-1801. expose InterruptibleIterator and TaskKilledException in deve... ...loper api Author: Koert Kuipers Closes #764 from koertkuipers/feat-rdd-developerapi and squashes the following commits: 8516dd2 [Koert Kuipers] SPARK-1801. expose InterruptibleIterator and TaskKilledException in developer api --- .../scala/org/apache/spark/InterruptibleIterator.scala | 6 +++++- .../main/scala/org/apache/spark/TaskKilledException.scala | 8 ++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala index ec11dbbffaaf8..f40baa8e43592 100644 --- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala +++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala @@ -17,11 +17,15 @@ package org.apache.spark +import org.apache.spark.annotation.DeveloperApi + /** + * :: DeveloperApi :: * An iterator that wraps around an existing iterator to provide task killing functionality. * It works by checking the interrupted flag in [[TaskContext]]. */ -private[spark] class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) +@DeveloperApi +class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) extends Iterator[T] { def hasNext: Boolean = { diff --git a/core/src/main/scala/org/apache/spark/TaskKilledException.scala b/core/src/main/scala/org/apache/spark/TaskKilledException.scala index cbd6b2866e4f9..ad487c4efb87a 100644 --- a/core/src/main/scala/org/apache/spark/TaskKilledException.scala +++ b/core/src/main/scala/org/apache/spark/TaskKilledException.scala @@ -17,7 +17,11 @@ package org.apache.spark +import org.apache.spark.annotation.DeveloperApi + /** - * Exception for a task getting killed. + * :: DeveloperApi :: + * Exception thrown when a task is explicitly killed (i.e., task failure is expected). */ -private[spark] class TaskKilledException extends RuntimeException +@DeveloperApi +class TaskKilledException extends RuntimeException