Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ESQL: COALESCE function #98542

Merged
merged 10 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/reference/esql/esql-functions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ these functions:
* <<esql-auto_bucket>>
* <<esql-case>>
* <<esql-cidr_match>>
* <<esql-coalesce>>
* <<esql-concat>>
* <<esql-cos>>
* <<esql-cosh>>
Expand Down Expand Up @@ -72,6 +73,7 @@ include::functions/atan2.asciidoc[]
include::functions/auto_bucket.asciidoc[]
include::functions/case.asciidoc[]
include::functions/cidr_match.asciidoc[]
include::functions/coalesce.asciidoc[]
include::functions/concat.asciidoc[]
include::functions/cos.asciidoc[]
include::functions/cosh.asciidoc[]
Expand Down
13 changes: 13 additions & 0 deletions docs/reference/esql/functions/coalesce.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[[esql-coalesce]]
=== `COALESCE`

Returns the first non-null value.

[source.merge.styled,esql]
----
include::{esql-specs}/nulls.csv-spec[tag=coalesce]
----
[%header.monospaced.styled,format=dsv,separator=|]
|===
include::{esql-specs}/null.csv-spec[tag=coalesce-result]
|===
Original file line number Diff line number Diff line change
Expand Up @@ -89,29 +89,3 @@ M |10
M |10
M |10
;

isNull
from employees
| where is_null(gender)
| sort first_name
| keep first_name, gender
| limit 3;

first_name:keyword|gender:keyword
Berni |null
Cristinel |null
Duangkaew |null
;

notIsNull
from employees
| where not is_null(gender)
| sort first_name
| keep first_name, gender
| limit 3;

first_name:keyword|gender:keyword
Alejandro |F
Amabile |M
Anneke |F
;
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
isNull
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though Coalesce can have an arbitrary number of parameters, the tests here always use two.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

y=COALESCE(true, false, null) doesn't seem to work. The value for y is null. Only if I the last value is non-null it works.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh. I thought I had tests for all that in the unit tests. I'll add more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting! the break here was with an optimizer rule. I'll push a fix.

from employees
| where is_null(gender)
| sort first_name
| keep first_name, gender
| limit 3;

first_name:keyword|gender:keyword
Berni |null
Cristinel |null
Duangkaew |null
;

notIsNull
from employees
| where not is_null(gender)
| sort first_name
| keep first_name, gender
| limit 3;

first_name:keyword|gender:keyword
Alejandro |F
Amabile |M
Anneke |F
;

coalesceSimple
// tag::coalesce[]
ROW a=null, b="b"
| EVAL COALESCE(a, b)
// end::coalesce[]
;

// tag::coalesce-result[]
a:null | b:keyword | COALESCE(a,b):keyword
null | b | b
// end::coalesce-result[]
;

coalesce
FROM employees
| EVAL first_name = COALESCE(first_name, "X")
| SORT first_name DESC, emp_no ASC
| KEEP emp_no, first_name
| limit 10;

emp_no:integer | first_name:keyword
10047 | Zvonko
10081 | Zhongwei
10026 | Yongqiao
10043 | Yishay
10050 | Yinghua
10087 | Xinglin
10030 | X
10031 | X
10032 | X
10033 | X
;

coalesceBackwards
FROM employees
| EVAL first_name = COALESCE("X", first_name)
| SORT first_name DESC, emp_no ASC
| KEEP emp_no, first_name
| limit 10;

emp_no:integer | first_name:keyword
10001 | X
10002 | X
10003 | X
10004 | X
10005 | X
10006 | X
10007 | X
10008 | X
10009 | X
10010 | X
;

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ auto_bucket |auto_bucket(arg1, arg2, arg3, arg4)
avg |avg(arg1)
case |case(arg1...)
cidr_match |cidr_match(arg1, arg2...)
coalesce |coalesce(arg1...)
concat |concat(arg1, arg2...)
cos |cos(arg1)
cosh |cosh(arg1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMedian;
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMin;
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvSum;
import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
import org.elasticsearch.xpack.esql.expression.function.scalar.string.Concat;
import org.elasticsearch.xpack.esql.expression.function.scalar.string.Length;
import org.elasticsearch.xpack.esql.expression.function.scalar.string.Split;
Expand Down Expand Up @@ -140,7 +141,9 @@ private FunctionDefinition[][] functions() {
def(DateTrunc.class, DateTrunc::new, "date_trunc"),
def(Now.class, Now::new, "now") },
// conditional
new FunctionDefinition[] { def(Case.class, Case::new, "case"), def(IsNull.class, IsNull::new, "is_null"), },
new FunctionDefinition[] { def(Case.class, Case::new, "case") },
// null
new FunctionDefinition[] { def(Coalesce.class, Coalesce::new, "coalesce"), def(IsNull.class, IsNull::new, "is_null"), },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just removed the is_null / is_not_null functions. The same functionality is now available with IS NULL / IS NOT NULL.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

++. I'll get that merged.

// IP
new FunctionDefinition[] { def(CIDRMatch.class, CIDRMatch::new, "cidr_match") },
// conversion functions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.expression.function.scalar.nulls;

import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.ElementType;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.EvalOperator;
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.Mappable;
import org.elasticsearch.xpack.ql.expression.Expression;
import org.elasticsearch.xpack.ql.expression.Nullability;
import org.elasticsearch.xpack.ql.expression.TypeResolutions;
import org.elasticsearch.xpack.ql.expression.function.scalar.ScalarFunction;
import org.elasticsearch.xpack.ql.expression.gen.script.ScriptTemplate;
import org.elasticsearch.xpack.ql.tree.NodeInfo;
import org.elasticsearch.xpack.ql.tree.Source;
import org.elasticsearch.xpack.ql.type.DataType;

import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static org.elasticsearch.xpack.ql.type.DataTypes.NULL;

/**
* Function returning the first non-null value.
*/
public class Coalesce extends ScalarFunction implements Mappable {
private DataType dataType;

public Coalesce(Source source, List<Expression> expressions) {
super(source, expressions);
}

@Override
public DataType dataType() {
if (dataType == null) {
resolveType();
}
return dataType;
}

@Override
protected TypeResolution resolveType() {
if (childrenResolved() == false) {
return new TypeResolution("Unresolved children");
}

for (int position = 0; position < children().size(); position++) {
if (dataType == null || dataType == NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this check we have Expressions.isNull() that does a similar thing.

dataType = children().get(position).dataType();
continue;
}
TypeResolution resolution = TypeResolutions.isType(
children().get(position),
t -> t == dataType,
sourceText(),
TypeResolutions.ParamOrdinal.fromIndex(position),
dataType.typeName()
);
if (resolution.unresolved()) {
return resolution;
}
}
return TypeResolution.TYPE_RESOLVED;
}

@Override
public Nullability nullable() {
return children().get(children().size() - 1).nullable();
}

@Override
public ScriptTemplate asScript() {
throw new UnsupportedOperationException();
}

@Override
public Expression replaceChildren(List<Expression> newChildren) {
return new Coalesce(source(), newChildren);
}

@Override
protected NodeInfo<? extends Expression> info() {
return NodeInfo.create(this, Coalesce::new, children());
}

@Override
public boolean foldable() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ES SQL there is a Coalesce function which is interesting because it's also coming together with an Optimizer rule and the foldable method is much simpler (thanks to the aforementioned rule).

Otherwise, there is Expressions.foldable(children()); that can be used here.

The way I see it:

  • either you merge this as is and create a GH issue to explore/improve NULL propagation for Coalesce (plus that optimizer rule)
  • have a look at Coalesce in ES SQL and SQL's Optimizer.PropagateNullable and have an adapted version in ESQL

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll open #98612 and replace with Expressions.foldable(children()) and we'll grab an optimizer rule later.

for (Expression c : children()) {
if (c.foldable() == false) {
return false;
}
if (c.fold() != null) { // TODO is this ok?
return true;
}
}
return true;
}

@Override
public Object fold() {
return Mappable.super.fold();
}

@Override
public Supplier<EvalOperator.ExpressionEvaluator> toEvaluator(
Function<Expression, Supplier<EvalOperator.ExpressionEvaluator>> toEvaluator
) {
List<Supplier<EvalOperator.ExpressionEvaluator>> evaluatorSuppliers = children().stream().map(toEvaluator).toList();
return () -> new CoalesceEvaluator(
LocalExecutionPlanner.toElementType(dataType()),
evaluatorSuppliers.stream().map(Supplier::get).toList()
);
}

private record CoalesceEvaluator(ElementType resultType, List<EvalOperator.ExpressionEvaluator> evaluators)
implements
EvalOperator.ExpressionEvaluator {
@Override
public Block eval(Page page) {
// Evaluate row at a time for now because its simpler. Much slower. But simpler.
int positionCount = page.getPositionCount();
Block.Builder result = resultType.newBlockBuilder(positionCount);
position: for (int p = 0; p < positionCount; p++) {
int[] positions = new int[] { p };
Page limited = new Page(
1,
IntStream.range(0, page.getBlockCount()).mapToObj(b -> page.getBlock(b).filter(positions)).toArray(Block[]::new)
);
for (EvalOperator.ExpressionEvaluator eval : evaluators) {
Block e = eval.eval(limited);
if (false == e.isNull(0)) {
result.copyFrom(e, 0, 1);
continue position;
}
}
result.appendNull();
}
return result.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMedian;
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvMin;
import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvSum;
import org.elasticsearch.xpack.esql.expression.function.scalar.nulls.Coalesce;
import org.elasticsearch.xpack.esql.expression.function.scalar.string.Concat;
import org.elasticsearch.xpack.esql.expression.function.scalar.string.Length;
import org.elasticsearch.xpack.esql.expression.function.scalar.string.Split;
Expand Down Expand Up @@ -321,6 +322,7 @@ public static List<PlanNameRegistry.Entry> namedTypeEntries() {
of(ScalarFunction.class, AutoBucket.class, PlanNamedTypes::writeAutoBucket, PlanNamedTypes::readAutoBucket),
of(ScalarFunction.class, Case.class, PlanNamedTypes::writeCase, PlanNamedTypes::readCase),
of(ScalarFunction.class, CIDRMatch.class, PlanNamedTypes::writeCIDRMatch, PlanNamedTypes::readCIDRMatch),
of(ScalarFunction.class, Coalesce.class, PlanNamedTypes::writeCoalesce, PlanNamedTypes::readCoalesce),
of(ScalarFunction.class, Concat.class, PlanNamedTypes::writeConcat, PlanNamedTypes::readConcat),
of(ScalarFunction.class, DateExtract.class, PlanNamedTypes::writeDateExtract, PlanNamedTypes::readDateExtract),
of(ScalarFunction.class, DateFormat.class, PlanNamedTypes::writeDateFormat, PlanNamedTypes::readDateFormat),
Expand Down Expand Up @@ -1126,6 +1128,14 @@ static void writeCase(PlanStreamOutput out, Case caseValue) throws IOException {
out.writeCollection(caseValue.children(), writerFromPlanWriter(PlanStreamOutput::writeExpression));
}

static Coalesce readCoalesce(PlanStreamInput in) throws IOException {
return new Coalesce(Source.EMPTY, in.readList(readerFromPlanReader(PlanStreamInput::readExpression)));
}

static void writeCoalesce(PlanStreamOutput out, Coalesce coalesce) throws IOException {
out.writeCollection(coalesce.children(), writerFromPlanWriter(PlanStreamOutput::writeExpression));
}

static Concat readConcat(PlanStreamInput in) throws IOException {
return new Concat(Source.EMPTY, in.readExpression(), in.readList(readerFromPlanReader(PlanStreamInput::readExpression)));
}
Expand Down
Loading