Skip to content

Commit

Permalink
jsonNode array index access
Browse files Browse the repository at this point in the history
  • Loading branch information
robert2d committed Jan 19, 2018
1 parent 09c5da7 commit 6f9ea5b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,12 @@ public Object evaluate(Object... args) {
if (currentNode == null) {
return null;
}
currentNode = currentNode.get(token);
try {
int index = Integer.parseInt(token);
currentNode = currentNode.get(index);
} catch (NumberFormatException e) {
currentNode = currentNode.get(token);
}
}
if (currentNode == null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ private void produceInitData() throws Exception {
Schema messageSchema = SchemaBuilder.struct().field("MESSAGE", SchemaBuilder.STRING_SCHEMA).build();

GenericRow messageRow = new GenericRow(Arrays.asList
("{\"log\":{\"@timestamp\":\"2017-05-30T16:44:22.175Z\",\"@version\":\"1\","
+ "\"caasVersion\":\"0.0.2\",\"cloud\":\"aws\",\"clusterId\":\"cp99\",\"clusterName\":\"kafka\",\"cpComponentId\":\"kafka\",\"host\":\"kafka-1-wwl0p\",\"k8sId\":\"k8s13\",\"k8sName\":\"perf\",\"level\":\"ERROR\",\"logger\":\"kafka.server.ReplicaFetcherThread\",\"message\":\"Found invalid messages during fetch for partition [foo512,172] offset 0 error Record is corrupt (stored crc = 1321230880, computed crc = 1139143803)\",\"networkId\":\"vpc-d8c7a9bf\",\"region\":\"us-west-2\",\"serverId\":\"1\",\"skuId\":\"sku5\",\"source\":\"kafka\",\"tenantId\":\"t47\",\"tenantName\":\"perf-test\",\"thread\":\"ReplicaFetcherThread-0-2\",\"zone\":\"us-west-2a\"},\"stream\":\"stdout\",\"time\":2017}"));
("{\"log\":{\"@timestamp\":\"2017-05-30T16:44:22.175Z\",\"@version\":\"1\","
+ "\"caasVersion\":\"0.0.2\",\"cloud\":\"aws\",\"logs\":[{\"entry\":\"first\"}],\"clusterId\":\"cp99\",\"clusterName\":\"kafka\",\"cpComponentId\":\"kafka\",\"host\":\"kafka-1-wwl0p\",\"k8sId\":\"k8s13\",\"k8sName\":\"perf\",\"level\":\"ERROR\",\"logger\":\"kafka.server.ReplicaFetcherThread\",\"message\":\"Found invalid messages during fetch for partition [foo512,172] offset 0 error Record is corrupt (stored crc = 1321230880, computed crc = 1139143803)\",\"networkId\":\"vpc-d8c7a9bf\",\"region\":\"us-west-2\",\"serverId\":\"1\",\"skuId\":\"sku5\",\"source\":\"kafka\",\"tenantId\":\"t47\",\"tenantName\":\"perf-test\",\"thread\":\"ReplicaFetcherThread-0-2\",\"zone\":\"us-west-2a\"},\"stream\":\"stdout\",\"time\":2017}"));

Map<String, GenericRow> records = new HashMap<>();
records.put("1", messageRow);
Expand Down Expand Up @@ -224,6 +224,32 @@ public void testJsonStreamExtractor() throws Exception {
ksqlEngine.terminateQuery(queryMetadata.getId(), true);
}

@Test
public void testJsonStreamExtractorNested() throws Exception {

final String streamName = "JSONSTREAM";
final String queryString = String.format("CREATE STREAM %s AS SELECT EXTRACTJSONFIELD"
+ "(message, '$.log.logs[0].entry') "
+ "FROM %s;",
streamName, messageLogStream);

PersistentQueryMetadata queryMetadata =
(PersistentQueryMetadata) ksqlEngine.buildMultipleQueries(queryString, Collections.emptyMap()).get(0);
queryMetadata.getKafkaStreams().start();

Schema resultSchema = SchemaUtil
.removeImplicitRowTimeRowKeyFromSchema(metaStore.getSource(streamName).getSchema());

Map<String, GenericRow> expectedResults = new HashMap<>();
expectedResults.put("1", new GenericRow(Arrays.asList("first")));

Map<String, GenericRow> results = readNormalResults(streamName, resultSchema, expectedResults.size());

assertThat(results, equalTo(expectedResults));

ksqlEngine.terminateQuery(queryMetadata.getId(), true);
}

//*********************************************************//

private Map<String, GenericRow> readNormalResults(String resultTopic, Schema resultSchema, int expectedNumMessages) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ public class JsonPathTokenizerTest {

@Test
public void testJsonPathTokenizer() throws IOException {
JsonPathTokenizer jsonPathTokenizer = new JsonPathTokenizer("$.log.cloud.region");
JsonPathTokenizer jsonPathTokenizer = new JsonPathTokenizer("$.logs[0].cloud.region");
ImmutableList<String> tokens = ImmutableList.copyOf(jsonPathTokenizer);
List<String> tokenList = tokens.asList();
Assert.assertTrue(tokenList.size() == 3);
Assert.assertTrue(tokenList.get(0).equalsIgnoreCase("log"));
Assert.assertTrue(tokenList.get(1).equalsIgnoreCase("cloud"));
Assert.assertTrue(tokenList.get(2).equalsIgnoreCase("region"));
Assert.assertTrue(tokenList.size() == 4);
Assert.assertTrue(tokenList.get(0).equalsIgnoreCase("logs"));
Assert.assertTrue(tokenList.get(1).equalsIgnoreCase("0"));
Assert.assertTrue(tokenList.get(2).equalsIgnoreCase("cloud"));
Assert.assertTrue(tokenList.get(3).equalsIgnoreCase("region"));

}

Expand Down

0 comments on commit 6f9ea5b

Please sign in to comment.