-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BUG] jsonNode array index access for EXTRACTJSONFIELD #610
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,8 +102,8 @@ private void produceInitData() throws Exception { | |
Schema messageSchema = SchemaBuilder.struct().field("MESSAGE", SchemaBuilder.STRING_SCHEMA).build(); | ||
|
||
GenericRow messageRow = new GenericRow(Arrays.asList | ||
("{\"log\":{\"@timestamp\":\"2017-05-30T16:44:22.175Z\",\"@version\":\"1\"," | ||
+ "\"caasVersion\":\"0.0.2\",\"cloud\":\"aws\",\"clusterId\":\"cp99\",\"clusterName\":\"kafka\",\"cpComponentId\":\"kafka\",\"host\":\"kafka-1-wwl0p\",\"k8sId\":\"k8s13\",\"k8sName\":\"perf\",\"level\":\"ERROR\",\"logger\":\"kafka.server.ReplicaFetcherThread\",\"message\":\"Found invalid messages during fetch for partition [foo512,172] offset 0 error Record is corrupt (stored crc = 1321230880, computed crc = 1139143803)\",\"networkId\":\"vpc-d8c7a9bf\",\"region\":\"us-west-2\",\"serverId\":\"1\",\"skuId\":\"sku5\",\"source\":\"kafka\",\"tenantId\":\"t47\",\"tenantName\":\"perf-test\",\"thread\":\"ReplicaFetcherThread-0-2\",\"zone\":\"us-west-2a\"},\"stream\":\"stdout\",\"time\":2017}")); | ||
("{\"log\":{\"@timestamp\":\"2017-05-30T16:44:22.175Z\",\"@version\":\"1\"," | ||
+ "\"caasVersion\":\"0.0.2\",\"cloud\":\"aws\",\"logs\":[{\"entry\":\"first\"}],\"clusterId\":\"cp99\",\"clusterName\":\"kafka\",\"cpComponentId\":\"kafka\",\"host\":\"kafka-1-wwl0p\",\"k8sId\":\"k8s13\",\"k8sName\":\"perf\",\"level\":\"ERROR\",\"logger\":\"kafka.server.ReplicaFetcherThread\",\"message\":\"Found invalid messages during fetch for partition [foo512,172] offset 0 error Record is corrupt (stored crc = 1321230880, computed crc = 1139143803)\",\"networkId\":\"vpc-d8c7a9bf\",\"region\":\"us-west-2\",\"serverId\":\"1\",\"skuId\":\"sku5\",\"source\":\"kafka\",\"tenantId\":\"t47\",\"tenantName\":\"perf-test\",\"thread\":\"ReplicaFetcherThread-0-2\",\"zone\":\"us-west-2a\"},\"stream\":\"stdout\",\"time\":2017}")); | ||
|
||
Map<String, GenericRow> records = new HashMap<>(); | ||
records.put("1", messageRow); | ||
|
@@ -224,6 +224,32 @@ public void testJsonStreamExtractor() throws Exception { | |
ksqlEngine.terminateQuery(queryMetadata.getId(), true); | ||
} | ||
|
||
@Test | ||
public void testJsonStreamExtractorNested() throws Exception { | ||
|
||
final String streamName = "JSONSTREAM"; | ||
final String queryString = String.format("CREATE STREAM %s AS SELECT EXTRACTJSONFIELD" | ||
+ "(message, '$.log.logs[0].entry') " | ||
+ "FROM %s;", | ||
streamName, messageLogStream); | ||
|
||
PersistentQueryMetadata queryMetadata = | ||
(PersistentQueryMetadata) ksqlEngine.buildMultipleQueries(queryString, Collections.emptyMap()).get(0); | ||
queryMetadata.getKafkaStreams().start(); | ||
|
||
Schema resultSchema = SchemaUtil | ||
.removeImplicitRowTimeRowKeyFromSchema(metaStore.getSource(streamName).getSchema()); | ||
|
||
Map<String, GenericRow> expectedResults = new HashMap<>(); | ||
expectedResults.put("1", new GenericRow(Arrays.asList("first"))); | ||
|
||
Map<String, GenericRow> results = readNormalResults(streamName, resultSchema, expectedResults.size()); | ||
|
||
assertThat(results, equalTo(expectedResults)); | ||
|
||
ksqlEngine.terminateQuery(queryMetadata.getId(), true); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should probably be done in a `try{...}finally{...}' otherwise it won't run when the test fails |
||
} | ||
|
||
//*********************************************************// | ||
|
||
private Map<String, GenericRow> readNormalResults(String resultTopic, Schema resultSchema, int expectedNumMessages) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,13 +28,14 @@ public class JsonPathTokenizerTest { | |
|
||
@Test | ||
public void testJsonPathTokenizer() throws IOException { | ||
JsonPathTokenizer jsonPathTokenizer = new JsonPathTokenizer("$.log.cloud.region"); | ||
JsonPathTokenizer jsonPathTokenizer = new JsonPathTokenizer("$.logs[0].cloud.region"); | ||
ImmutableList<String> tokens = ImmutableList.copyOf(jsonPathTokenizer); | ||
List<String> tokenList = tokens.asList(); | ||
Assert.assertTrue(tokenList.size() == 3); | ||
Assert.assertTrue(tokenList.get(0).equalsIgnoreCase("log")); | ||
Assert.assertTrue(tokenList.get(1).equalsIgnoreCase("cloud")); | ||
Assert.assertTrue(tokenList.get(2).equalsIgnoreCase("region")); | ||
Assert.assertTrue(tokenList.size() == 4); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know these were previously |
||
Assert.assertTrue(tokenList.get(0).equalsIgnoreCase("logs")); | ||
Assert.assertTrue(tokenList.get(1).equalsIgnoreCase("0")); | ||
Assert.assertTrue(tokenList.get(2).equalsIgnoreCase("cloud")); | ||
Assert.assertTrue(tokenList.get(3).equalsIgnoreCase("region")); | ||
|
||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you have both previous tests and this one? The UDF should work for both cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem I thought this might come up. Reason I modified the existing is that it essentially performs both tests when accessing the nested nodes anyway. But I'll add back in the original spec unmodified and duplicate it into a new one for array access now 😄