Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only trigger Comet Final aggregation on Comet partial aggregation #262

Closed
viirya opened this issue Apr 12, 2024 · 0 comments · Fixed by #264
Closed

Only trigger Comet Final aggregation on Comet partial aggregation #262

viirya opened this issue Apr 12, 2024 · 0 comments · Fixed by #264
Labels
bug Something isn't working

Comments

@viirya
Copy link
Member

viirya commented Apr 12, 2024

Describe the bug

Related to #250.

When enabling columnar shuffle, a partial Spark aggregation could be upstream of a Comet final aggregation, because columnar shuffle could be the scan source of Comet native operator.

But there is query failure on SQLQuerySuite's SPARK-3176 Added Parser of SQL LAST() test.

[info]   == Physical Plan ==                                                                                                                                                                                                           
[info]   AdaptiveSparkPlan isFinalPlan=true                                                                                                                                                                                            
[info]   +- == Final Plan ==                                                                                                                                                                                                           
[info]      *(2) ColumnarToRow                                                                                                                                                                                                         
[info]      +- CometHashAggregate [last#4396, valueSet#4397], Final, [last(n#93, false)]                                                                                                                                               
[info]         +- ShuffleQueryStage 0                                                                                                                                                                                                  [info]            +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10390]                                                                                                                 [info]               +- RowToColumnar                                                                                                                                                                                                  
[info]                  +- *(1) HashAggregate(keys=[], functions=[partial_last(n#93, false)], output=[last#4396, valueSet#4397])                                                                                                       
[info]                     +- *(1) SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]                                                                    
[info]                        +- Scan[obj#92]                                                                                                                                                                                          
[info]   +- == Initial Plan ==                                                                                                                                                                                                         
[info]      CometHashAggregate [last#4396, valueSet#4397], Final, [last(n#93, false)]                                                                                                                                                  
[info]      +- CometColumnarExchange SinglePartition, ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=10279]                                                                                                                       
[info]         +- HashAggregate(keys=[], functions=[partial_last(n#93, false)], output=[last#4396, valueSet#4397])                                                                                                                     
[info]            +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.sql.test.SQLTestData$LowerCaseData, true])).n AS n#93]                                                                                  
[info]               +- Scan[obj#92]                                                                                                                                                                                                   
[info]                                                                                                                                                                                                                                 
[info]   == Results ==                                                                                                                                                                                                                 
[info]                                                                                                                                                                                                                                 
[info]   == Results ==                                                                                                                                                                                                                 
[info]   !== Correct Answer - 1 ==   == Spark Answer - 1 ==
[info]   !struct<>                   struct<last(n):int>
[info]   ![4]                        [2] (QueryTest.scala:243)

It is because the aggregation attributes (i.e., state) of Spark Last aggregation expression are different to DataFusion's Last aggregation expression.

Spark Last has:

override lazy val aggBufferAttributes: Seq[AttributeReference] = last :: valueSet :: Nil

DataFusion's Last:

fn state_fields(&self) -> Result<Vec<Field>> {
        let mut fields = vec![Field::new(
            format_state_name(&self.name, "last_value"),
            self.input_data_type.clone(),
            true,
        )];
        fields.extend(ordering_fields(
            &self.ordering_req,
            &self.order_by_data_types,
        ));
        fields.push(Field::new(
            format_state_name(&self.name, "is_set"),
            DataType::Boolean,
            true,
        ));
        Ok(fields)
    }

I think this kind of issues will be more and more. Re-implementing such aggregation expressions in Comet seems taking too much cost on developing and maintaining. These cases are only happened if partial Spark aggregation cannot be transformed to Comet. In the failed query, it is because its upstream is not Comet plan. I think these cases are not what we care about mostly.

I think we should only have Comet final aggregation if the partial aggregation is Comet. It will simplify the things.

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant