Skip to content

Commit

Permalink
[UDF] JSON Array Contains function (#608)
Browse files Browse the repository at this point in the history
* Add JSON Array Contains function

* Add parametrization support

* Add @test annotation

* Add parametrization support

* Add function reference to doc

* Fix left brace style issue

* Rename test functions

* Remove unnecessary imports
  • Loading branch information
satybald authored and dguy committed Jan 25, 2018
1 parent 30b6125 commit 15dcbc7
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/syntax-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ Terminate a persistent query. Persistent queries run continuously until they are
| CEIL | `CEIL(col1)` | The ceiling of a value |
| CONCAT | `CONCAT(col1, '_hello')` | Concatenate two strings |
| EXTRACTJSONFIELD | `EXTRACTJSONFIELD(message, '$.log.cloud')` | Given a string column in JSON format, extract the field that matches |the given pattern.
| ARRAYCONTAINS | `ARRAYCONTAINS('[1, 2, 3]', 3)` | Given JSON or AVRO array checks if a search value contains in it. |
| FLOOR | `FLOOR(col1)` | The floor of a value |
| LCASE | `LCASE(col1)` | Convert a string to lowercase |
| LEN | `LEN(col1)` | The length of a string |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.confluent.ksql.function.udaf.topkdistinct.TopkDistinctAggFunctionDeterminer;
import io.confluent.ksql.function.udf.datetime.StringToTimestamp;
import io.confluent.ksql.function.udf.datetime.TimestampToString;
import io.confluent.ksql.function.udf.json.ArrayContainsKudf;
import io.confluent.ksql.function.udf.json.JsonExtractStringKudf;
import io.confluent.ksql.function.udf.math.AbsKudf;
import io.confluent.ksql.function.udf.math.CeilKudf;
Expand All @@ -41,6 +42,7 @@
import io.confluent.ksql.util.ExpressionTypeManager;
import io.confluent.ksql.util.KsqlException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -153,6 +155,27 @@ private void init() {
"EXTRACTJSONFIELD", JsonExtractStringKudf.class);
addFunction(getStringFromJson);

KsqlFunction jsonArrayContainsString = new KsqlFunction(
Schema.BOOLEAN_SCHEMA, Arrays.asList(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA),
"ARRAYCONTAINS", ArrayContainsKudf.class);
addFunction(jsonArrayContainsString);

addFunction(new KsqlFunction(
Schema.BOOLEAN_SCHEMA, Arrays.asList(SchemaBuilder.array(Schema.STRING_SCHEMA).build(), Schema.STRING_SCHEMA),
"ARRAYCONTAINS", ArrayContainsKudf.class));

addFunction(new KsqlFunction(
Schema.BOOLEAN_SCHEMA, Arrays.asList(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Schema.INT32_SCHEMA),
"ARRAYCONTAINS", ArrayContainsKudf.class));

addFunction(new KsqlFunction(
Schema.BOOLEAN_SCHEMA, Arrays.asList(SchemaBuilder.array(Schema.INT64_SCHEMA).build(), Schema.INT64_SCHEMA),
"ARRAYCONTAINS", ArrayContainsKudf.class));

addFunction(new KsqlFunction(
Schema.BOOLEAN_SCHEMA, Arrays.asList(SchemaBuilder.array(Schema.FLOAT64_SCHEMA).build(), Schema.FLOAT64_SCHEMA),
"ARRAYCONTAINS", ArrayContainsKudf.class));


/***************************************
* UDAFs *
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package io.confluent.ksql.function.udf.json;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Kudf;
import io.confluent.ksql.util.ArrayUtil;
import io.confluent.ksql.util.KsqlException;

import java.io.IOException;
import java.util.Arrays;

import static com.fasterxml.jackson.core.JsonFactory.Feature.CANONICALIZE_FIELD_NAMES;
import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
import static com.fasterxml.jackson.core.JsonToken.VALUE_NULL;
import static com.fasterxml.jackson.core.JsonToken.VALUE_NUMBER_FLOAT;
import static com.fasterxml.jackson.core.JsonToken.VALUE_NUMBER_INT;
import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE;

public class ArrayContainsKudf
implements Kudf {
private static final JsonFactory JSON_FACTORY = new JsonFactory()
.disable(CANONICALIZE_FIELD_NAMES);

@Override
public void init() {
}

@Override
public Object evaluate(Object... args) {
if (args.length != 2) {
throw new KsqlFunctionException("ARRAY_CONTAINS udf should have two input argument. Given: " + Arrays.toString(args));
}
Object searchValue = args[1];
if(args[0] instanceof String) {
return jsonStringArrayContains(searchValue, (String) args[0]);
} else if(args[0] instanceof Object[]) {
return ArrayUtil.containsValue(searchValue, (Object[]) args[0]);
}
throw new KsqlFunctionException("Invalid type parameters for " + Arrays.toString(args));
}

private boolean jsonStringArrayContains(Object searchValue, String jsonArray) {
JsonToken valueType = getType(searchValue);
try (JsonParser parser = JSON_FACTORY.createParser(jsonArray)) {
if (parser.nextToken() != START_ARRAY) {
return false;
}

while (parser.currentToken() != null) {
JsonToken token = parser.nextToken();
if (token == null) {
return searchValue == null;
}
if (token == END_ARRAY) {
return false;
}
parser.skipChildren();
if (valueType == token) {
if (valueType == VALUE_NULL && searchValue == null) {
return true;
} else if ((valueType == VALUE_STRING)
&& parser.getText().equals(searchValue)) {
return true;
} else if((valueType == VALUE_FALSE || valueType == VALUE_TRUE)
&& (parser.getBooleanValue() == (boolean)searchValue)) {
return true;
} else if((valueType == VALUE_NUMBER_INT)) {
if(searchValue instanceof Integer && parser.getIntValue() == (int) searchValue) {
return true;
} else if(searchValue instanceof Long && parser.getLongValue() == (long) searchValue) {
return true;
}
} else if((valueType == VALUE_NUMBER_FLOAT)
&& parser.getDoubleValue() == (double)searchValue) {
return true;
}
}
}
}
catch (IOException e) {
throw new KsqlException("Invalid JSON format: " + jsonArray, e);
}
return false;
}

/**
* Returns JsonToken type of the targetValue
*/
private JsonToken getType(Object searchValue) {
if(searchValue instanceof Long || searchValue instanceof Integer) {
return VALUE_NUMBER_INT;
} else if(searchValue instanceof Double) {
return VALUE_NUMBER_FLOAT;
} else if(searchValue instanceof String) {
return VALUE_STRING;
} else if(searchValue == null) {
return VALUE_NULL;
} else if(searchValue instanceof Boolean) {
boolean value = (boolean) searchValue;
return value ? VALUE_TRUE : VALUE_FALSE;
}
throw new KsqlFunctionException("Invalid Type for search value " + searchValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package io.confluent.ksql.util;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Objects;

public class ArrayUtil {

Expand Down Expand Up @@ -56,11 +58,6 @@ public static <T> T[] padWithNull(Class<T> clazz, T[] array, int finalLength) {
}

public static <T> boolean containsValue(T value, T[] array) {
for (T v: array) {
if (v != null && v.equals(value)) {
return true;
}
}
return false;
return Arrays.stream(array).anyMatch(o -> Objects.equals(o, value));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package io.confluent.ksql.function.udf.json;

import org.junit.Before;
import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class ArrayContainsKudfTest
{
private ArrayContainsKudf jsonUdf = new ArrayContainsKudf();

@Before
public void setUp() {
jsonUdf.init();
}

@Test
public void shouldReturnFalseOnEmptyArray() {
assertEquals(false, jsonUdf.evaluate("[]", true));
assertEquals(false, jsonUdf.evaluate("[]", false));
assertEquals(false, jsonUdf.evaluate("[]", null));
assertEquals(false, jsonUdf.evaluate("[]", 1.0));
assertEquals(false, jsonUdf.evaluate("[]", 100));
assertEquals(false, jsonUdf.evaluate("[]", "abc"));
assertEquals(false, jsonUdf.evaluate("[]", ""));
}

@Test
public void shouldNotFindValuesInNullArray() {
assertEquals(true, jsonUdf.evaluate("[null]", null));
assertEquals(false, jsonUdf.evaluate("[null]", "null"));
assertEquals(false, jsonUdf.evaluate("[null]", true));
assertEquals(false, jsonUdf.evaluate("[null]", false));
assertEquals(false, jsonUdf.evaluate("[null]", 1.0));
assertEquals(false, jsonUdf.evaluate("[null]", 100));
assertEquals(false, jsonUdf.evaluate("[null]", "abc"));
assertEquals(false, jsonUdf.evaluate("[null]", ""));
}

@Test
public void shouldFindIntegersInJsonArray() {
String json = "[2147483647, {\"ab\":null }, -2147483648, 1, 2, 3, null, [4], 4]";
assertEquals(true, jsonUdf.evaluate(json, 2147483647));
assertEquals(true, jsonUdf.evaluate(json, -2147483648));
assertEquals(true, jsonUdf.evaluate(json, 1));
assertEquals(true, jsonUdf.evaluate(json, 2));
assertEquals(true, jsonUdf.evaluate(json, 3));
assertEquals(false, jsonUdf.evaluate("5", 5));
assertEquals(false, jsonUdf.evaluate(json, 5));
}

@Test
public void shouldFindLongsInJsonArray() {
assertEquals(true, jsonUdf.evaluate("[1]", 1L));
assertEquals(true, jsonUdf.evaluate("[1111111111111111]", 1111111111111111L));
assertEquals(true, jsonUdf.evaluate("[[222222222222222], 33333]", 33333L));
assertEquals(true, jsonUdf.evaluate("[{}, \"abc\", null, 1]", 1L));
assertEquals(false, jsonUdf.evaluate("[[222222222222222], 33333]", 222222222222222L));
assertEquals(false, jsonUdf.evaluate("[{}, \"abc\", null, [1]]", 1L));
assertEquals(false, jsonUdf.evaluate("[{}, \"abc\", null, {\"1\":1}]", 1L));
}

@Test
public void shouldFindDoublesInJsonArray() {
assertEquals(true, jsonUdf.evaluate("[-1.0, 2.0, 3.0]", 2.0));
assertEquals(true, jsonUdf.evaluate("[1.0, -2.0, 3.0]", -2.0));
assertEquals(true, jsonUdf.evaluate("[1.0, 2.0, 1.6E3]", 1600.0));
assertEquals(true, jsonUdf.evaluate("[1.0, 2.0, -1.6E3]", -1600.0));
assertEquals(true, jsonUdf.evaluate("[{}, \"abc\", null, -2.0]", -2.0));
assertEquals(false, jsonUdf.evaluate("[[2.0], 3.0]", 2.0));
}

@Test
public void shouldFindStringsInJsonArray() {
assertEquals(true, jsonUdf.evaluate("[\"abc\"]", "abc"));
assertEquals(true, jsonUdf.evaluate("[\"cbda\", \"abc\"]", "abc"));
assertEquals(true, jsonUdf.evaluate("[{}, \"abc\", null, 1]", "abc"));
assertEquals(true, jsonUdf.evaluate("[\"\"]", ""));
assertEquals(false, jsonUdf.evaluate("[\"\"]", null));
assertEquals(false, jsonUdf.evaluate("[1,2,3]", "1"));
assertEquals(false, jsonUdf.evaluate("[null]", ""));
assertEquals(false, jsonUdf.evaluate("[\"abc\", \"dba\"]", "abd"));
}

@Test
public void shouldFindBooleansInJsonArray() {
assertEquals(true, jsonUdf.evaluate("[false, false, true, false]", true));
assertEquals(true, jsonUdf.evaluate("[true, true, false]", false));
assertEquals(false, jsonUdf.evaluate("[true, true]", false));
assertEquals(false, jsonUdf.evaluate("[false, false]", true));
}

@Test
public void shouldFindStringInAvroArray() {
assertEquals(true, jsonUdf.evaluate(new String[]{"abc", "bd", "DC"}, "DC"));
assertEquals(false, jsonUdf.evaluate(new String[]{"abc", "bd", "DC"}, "dc"));
assertEquals(false, jsonUdf.evaluate(new String[]{"abc", "bd", "1"}, 1));
}

@Test
public void shouldFindIntegersInAvroArray() {
assertEquals(true, jsonUdf.evaluate(new Integer[]{1, 2, 3}, 2));
assertEquals(false, jsonUdf.evaluate(new Integer[]{1, 2, 3}, 0));
assertEquals(false, jsonUdf.evaluate(new Integer[]{1, 2, 3}, "1"));
assertEquals(false, jsonUdf.evaluate(new Integer[]{1, 2, 3}, "aa"));
}

@Test
public void shouldFindLongInAvroArray() {
assertEquals(true, jsonUdf.evaluate(new Long[]{1L, 2L, 3L}, 2L));
assertEquals(false, jsonUdf.evaluate(new Long[]{1L, 2L, 3L}, 0L));
assertEquals(false, jsonUdf.evaluate(new Long[]{1L, 2L, 3L}, "1"));
assertEquals(false, jsonUdf.evaluate(new Long[]{1L, 2L, 3L}, "aaa"));
}

@Test
public void shouldFindDoublesInAvroArray() {
assertEquals(true, jsonUdf.evaluate(new Double[]{1.0, 2.0, 3.0}, 2.0));
assertEquals(false, jsonUdf.evaluate(new Double[]{1.0, 2.0, 3.0}, 4.0));
assertEquals(false, jsonUdf.evaluate(new Double[]{1.0, 2.0, 3.0}, "1"));
assertEquals(false, jsonUdf.evaluate(new Double[]{1.0, 2.0, 3.0}, "aaa"));
}
}

0 comments on commit 15dcbc7

Please sign in to comment.