From 6f9ea5b55fd1528a6b552c9ae0f5b23993ef41bf Mon Sep 17 00:00:00 2001 From: Dave Robertson Date: Wed, 10 Jan 2018 12:54:55 +1300 Subject: [PATCH] jsonNode array index access --- .../udf/json/JsonExtractStringKudf.java | 7 ++++- .../ksql/integration/JsonFormatTest.java | 30 +++++++++++++++++-- .../ksql/util/json/JsonPathTokenizerTest.java | 11 +++---- 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonExtractStringKudf.java b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonExtractStringKudf.java index ed6609b95315..978f6e01ec8e 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonExtractStringKudf.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/function/udf/json/JsonExtractStringKudf.java @@ -58,7 +58,12 @@ public Object evaluate(Object... args) { if (currentNode == null) { return null; } - currentNode = currentNode.get(token); + try { + int index = Integer.parseInt(token); + currentNode = currentNode.get(index); + } catch (NumberFormatException e) { + currentNode = currentNode.get(token); + } } if (currentNode == null) { return null; diff --git a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java index b2d8d5c0dd50..c6ff70481d4e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/integration/JsonFormatTest.java @@ -102,8 +102,8 @@ private void produceInitData() throws Exception { Schema messageSchema = SchemaBuilder.struct().field("MESSAGE", SchemaBuilder.STRING_SCHEMA).build(); GenericRow messageRow = new GenericRow(Arrays.asList - ("{\"log\":{\"@timestamp\":\"2017-05-30T16:44:22.175Z\",\"@version\":\"1\"," - + "\"caasVersion\":\"0.0.2\",\"cloud\":\"aws\",\"clusterId\":\"cp99\",\"clusterName\":\"kafka\",\"cpComponentId\":\"kafka\",\"host\":\"kafka-1-wwl0p\",\"k8sId\":\"k8s13\",\"k8sName\":\"perf\",\"level\":\"ERROR\",\"logger\":\"kafka.server.ReplicaFetcherThread\",\"message\":\"Found invalid messages during fetch for partition [foo512,172] offset 0 error Record is corrupt (stored crc = 1321230880, computed crc = 1139143803)\",\"networkId\":\"vpc-d8c7a9bf\",\"region\":\"us-west-2\",\"serverId\":\"1\",\"skuId\":\"sku5\",\"source\":\"kafka\",\"tenantId\":\"t47\",\"tenantName\":\"perf-test\",\"thread\":\"ReplicaFetcherThread-0-2\",\"zone\":\"us-west-2a\"},\"stream\":\"stdout\",\"time\":2017}")); + ("{\"log\":{\"@timestamp\":\"2017-05-30T16:44:22.175Z\",\"@version\":\"1\"," + + "\"caasVersion\":\"0.0.2\",\"cloud\":\"aws\",\"logs\":[{\"entry\":\"first\"}],\"clusterId\":\"cp99\",\"clusterName\":\"kafka\",\"cpComponentId\":\"kafka\",\"host\":\"kafka-1-wwl0p\",\"k8sId\":\"k8s13\",\"k8sName\":\"perf\",\"level\":\"ERROR\",\"logger\":\"kafka.server.ReplicaFetcherThread\",\"message\":\"Found invalid messages during fetch for partition [foo512,172] offset 0 error Record is corrupt (stored crc = 1321230880, computed crc = 1139143803)\",\"networkId\":\"vpc-d8c7a9bf\",\"region\":\"us-west-2\",\"serverId\":\"1\",\"skuId\":\"sku5\",\"source\":\"kafka\",\"tenantId\":\"t47\",\"tenantName\":\"perf-test\",\"thread\":\"ReplicaFetcherThread-0-2\",\"zone\":\"us-west-2a\"},\"stream\":\"stdout\",\"time\":2017}")); Map records = new HashMap<>(); records.put("1", messageRow); @@ -224,6 +224,32 @@ public void testJsonStreamExtractor() throws Exception { ksqlEngine.terminateQuery(queryMetadata.getId(), true); } + @Test + public void testJsonStreamExtractorNested() throws Exception { + + final String streamName = "JSONSTREAM"; + final String queryString = String.format("CREATE STREAM %s AS SELECT EXTRACTJSONFIELD" + + "(message, '$.log.logs[0].entry') " + + "FROM %s;", + streamName, messageLogStream); + + PersistentQueryMetadata queryMetadata = + (PersistentQueryMetadata) ksqlEngine.buildMultipleQueries(queryString, Collections.emptyMap()).get(0); + queryMetadata.getKafkaStreams().start(); + + Schema resultSchema = SchemaUtil + .removeImplicitRowTimeRowKeyFromSchema(metaStore.getSource(streamName).getSchema()); + + Map expectedResults = new HashMap<>(); + expectedResults.put("1", new GenericRow(Arrays.asList("first"))); + + Map results = readNormalResults(streamName, resultSchema, expectedResults.size()); + + assertThat(results, equalTo(expectedResults)); + + ksqlEngine.terminateQuery(queryMetadata.getId(), true); + } + //*********************************************************// private Map readNormalResults(String resultTopic, Schema resultSchema, int expectedNumMessages) { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/util/json/JsonPathTokenizerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/util/json/JsonPathTokenizerTest.java index 237c80adef8d..aeb8368b1c4b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/util/json/JsonPathTokenizerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/util/json/JsonPathTokenizerTest.java @@ -28,13 +28,14 @@ public class JsonPathTokenizerTest { @Test public void testJsonPathTokenizer() throws IOException { - JsonPathTokenizer jsonPathTokenizer = new JsonPathTokenizer("$.log.cloud.region"); + JsonPathTokenizer jsonPathTokenizer = new JsonPathTokenizer("$.logs[0].cloud.region"); ImmutableList tokens = ImmutableList.copyOf(jsonPathTokenizer); List tokenList = tokens.asList(); - Assert.assertTrue(tokenList.size() == 3); - Assert.assertTrue(tokenList.get(0).equalsIgnoreCase("log")); - Assert.assertTrue(tokenList.get(1).equalsIgnoreCase("cloud")); - Assert.assertTrue(tokenList.get(2).equalsIgnoreCase("region")); + Assert.assertTrue(tokenList.size() == 4); + Assert.assertTrue(tokenList.get(0).equalsIgnoreCase("logs")); + Assert.assertTrue(tokenList.get(1).equalsIgnoreCase("0")); + Assert.assertTrue(tokenList.get(2).equalsIgnoreCase("cloud")); + Assert.assertTrue(tokenList.get(3).equalsIgnoreCase("region")); }