Skip to content

Commit

Permalink
Add JSON Array Contains function
Browse files Browse the repository at this point in the history
  • Loading branch information
Sayat Satybaldiyev committed Jan 9, 2018
1 parent f0117cf commit ef64f44
Show file tree
Hide file tree
Showing 3 changed files with 222 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.confluent.ksql.function.udaf.sum.SumAggFunctionDeterminer;
import io.confluent.ksql.function.udf.datetime.StringToTimestamp;
import io.confluent.ksql.function.udf.datetime.TimestampToString;
import io.confluent.ksql.function.udf.json.JsonArrayContainsKudf;
import io.confluent.ksql.function.udf.json.JsonExtractStringKudf;
import io.confluent.ksql.function.udf.math.AbsKudf;
import io.confluent.ksql.function.udf.math.CeilKudf;
Expand All @@ -39,6 +40,7 @@
import io.confluent.ksql.util.ExpressionTypeManager;
import io.confluent.ksql.util.KsqlException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -151,6 +153,12 @@ private void init() {
"EXTRACTJSONFIELD", JsonExtractStringKudf.class);
addFunction(getStringFromJson);

Schema array = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
KsqlFunction jsonArrayContainsString = new KsqlFunction(
Schema.BOOLEAN_SCHEMA, Arrays.asList(array, Schema.STRING_SCHEMA),
"JSON_ARRAY_CONTAINS", JsonArrayContainsKudf.class);
addFunction(jsonArrayContainsString);


/***************************************
* UDAFs *
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/**
* Copyright 2017 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package io.confluent.ksql.function.udf.json;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import io.confluent.ksql.function.KsqlFunctionException;
import io.confluent.ksql.function.udf.Kudf;
import io.confluent.ksql.util.KsqlException;

import java.io.IOException;
import java.util.Arrays;

import static com.fasterxml.jackson.core.JsonFactory.Feature.CANONICALIZE_FIELD_NAMES;
import static com.fasterxml.jackson.core.JsonToken.END_ARRAY;
import static com.fasterxml.jackson.core.JsonToken.START_ARRAY;
import static com.fasterxml.jackson.core.JsonToken.VALUE_FALSE;
import static com.fasterxml.jackson.core.JsonToken.VALUE_NULL;
import static com.fasterxml.jackson.core.JsonToken.VALUE_NUMBER_FLOAT;
import static com.fasterxml.jackson.core.JsonToken.VALUE_NUMBER_INT;
import static com.fasterxml.jackson.core.JsonToken.VALUE_STRING;
import static com.fasterxml.jackson.core.JsonToken.VALUE_TRUE;

public class JsonArrayContainsKudf
implements Kudf {
private static final JsonFactory JSON_FACTORY = new JsonFactory()
.disable(CANONICALIZE_FIELD_NAMES);

@Override
public void init() {
}

@Override
public Object evaluate(Object... args)
{
if (args.length != 2) {
throw new KsqlFunctionException("ARRAY_CONTAINS udf should have two input argument. Given: " + Arrays.toString(args));
}
String jsonString = args[0].toString();
Object search = args[1];

return ifContains(jsonString, search);
}

private boolean ifContains(String json, Object searchValue)
{
JsonToken valueType = getType(searchValue);
try (JsonParser parser = JSON_FACTORY.createParser(json)) {
if (parser.nextToken() != START_ARRAY) {
return false;
}

while (parser.currentToken() != null) {
JsonToken token = parser.nextToken();
if (token == null) {
return token == searchValue;
}
if (token == END_ARRAY) {
return false;
}
parser.skipChildren();
if (valueType == token) {
if (valueType == VALUE_NULL && searchValue == null) {
return true;
} else if ((valueType == VALUE_STRING)
&& parser.getText().equals(searchValue)) {
return true;
} else if((valueType == VALUE_FALSE || valueType == VALUE_TRUE)
&& (parser.getBooleanValue() == (boolean)searchValue)) {
return true;
} else if((valueType == VALUE_NUMBER_INT)) {
if(searchValue instanceof Integer && parser.getIntValue() == (int) searchValue) {
return true;
} else if(searchValue instanceof Long && parser.getLongValue() == (long) searchValue) {
return true;
}
} else if((valueType == VALUE_NUMBER_FLOAT)
&& parser.getDoubleValue() == (double)searchValue) {
return true;
}
}
}
}
catch (IOException e) {
throw new KsqlException("Invalid JSON format.", e);
}
return false;
}

/**
* Returns JsonToken type of the targetValue
*/
private JsonToken getType(Object searchValue)
{
if(searchValue instanceof Long || searchValue instanceof Integer) {
return VALUE_NUMBER_INT;
} else if(searchValue instanceof Double) {
return VALUE_NUMBER_FLOAT;
} else if(searchValue instanceof String) {
return VALUE_STRING;
} else if(searchValue == null) {
return VALUE_NULL;
} else if(searchValue instanceof Boolean) {
boolean value = (boolean) searchValue;
return value ? VALUE_TRUE : VALUE_FALSE;
}
throw new KsqlFunctionException("Invalid Type for search value " + searchValue);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package io.confluent.ksql.function.udf.json;

import junit.framework.TestCase;
import org.junit.Test;

public class JsonArrayContainsKudfTest
extends TestCase
{
private JsonArrayContainsKudf jsonUdf = new JsonArrayContainsKudf();

@Override
public void setUp()
throws Exception
{
jsonUdf.init();
}

public void testEmptyArray()
{
assertEquals(false, jsonUdf.evaluate("[]", true));
assertEquals(false, jsonUdf.evaluate("[]", false));
assertEquals(false, jsonUdf.evaluate("[]", null));
assertEquals(false, jsonUdf.evaluate("[]", 1.0));
assertEquals(false, jsonUdf.evaluate("[]", 100));
assertEquals(false, jsonUdf.evaluate("[]", "abc"));
assertEquals(false, jsonUdf.evaluate("[]", ""));
}

public void testNullArray()
{
assertEquals(true, jsonUdf.evaluate("[null]", null));
assertEquals(false, jsonUdf.evaluate("[null]", "null"));
assertEquals(false, jsonUdf.evaluate("[null]", true));
assertEquals(false, jsonUdf.evaluate("[null]", false));
assertEquals(false, jsonUdf.evaluate("[null]", 1.0));
assertEquals(false, jsonUdf.evaluate("[null]", 100));
assertEquals(false, jsonUdf.evaluate("[null]", "abc"));
assertEquals(false, jsonUdf.evaluate("[null]", ""));
}

@Test
public void testIntegerValues()
{
String json = "[2147483647, {\"ab\":null }, -2147483648, 1, 2, 3, null, [4], 4]";
assertEquals(true, jsonUdf.evaluate(json, 2147483647));
assertEquals(true, jsonUdf.evaluate(json, -2147483648));
assertEquals(true, jsonUdf.evaluate(json, 1));
assertEquals(true, jsonUdf.evaluate(json, 2));
assertEquals(true, jsonUdf.evaluate(json, 3));
assertEquals(false, jsonUdf.evaluate("5", 5));
assertEquals(false, jsonUdf.evaluate(json, 5));
}

public void testIfArrayContainsLongValue()
{
assertEquals(true, jsonUdf.evaluate("[1]", 1L));
assertEquals(true, jsonUdf.evaluate("[1111111111111111]", 1111111111111111L));
assertEquals(true, jsonUdf.evaluate("[[222222222222222], 33333]", 33333L));
assertEquals(true, jsonUdf.evaluate("[{}, \"abc\", null, 1]", 1L));
assertEquals(false, jsonUdf.evaluate("[[222222222222222], 33333]", 222222222222222L));
assertEquals(false, jsonUdf.evaluate("[{}, \"abc\", null, [1]]", 1L));
assertEquals(false, jsonUdf.evaluate("[{}, \"abc\", null, {\"1\":1}]", 1L));
}

public void testIfArrayContainsDoubleValue()
{
//TODO:
}

public void testIfArrayContainsStringValue()
{
assertEquals(true, jsonUdf.evaluate("[\"abc\"]", "abc"));
assertEquals(true, jsonUdf.evaluate("[\"cbda\", \"abc\"]", "abc"));
assertEquals(true, jsonUdf.evaluate("[{}, \"abc\", null, 1]", "abc"));
assertEquals(true, jsonUdf.evaluate("[\"\"]", ""));
assertEquals(false, jsonUdf.evaluate("[\"\"]", null));
assertEquals(false, jsonUdf.evaluate("[1,2,3]", "1"));
assertEquals(false, jsonUdf.evaluate("[null]", ""));
assertEquals(false, jsonUdf.evaluate("[\"abc\", \"dba\"]", "abd"));
}

public void testIfArrayContainsBooleanValue()
{
assertEquals(true, jsonUdf.evaluate("[false, false, true, false]", true));
assertEquals(true, jsonUdf.evaluate("[true, true, false]", false));
assertEquals(false, jsonUdf.evaluate("[true, true]", false));
assertEquals(false, jsonUdf.evaluate("[false, false]", true));

}
}

0 comments on commit ef64f44

Please sign in to comment.