From 59e9fd6227117d3b309975256e87b1ab324d6e30 Mon Sep 17 00:00:00 2001 From: David Venable Date: Tue, 14 May 2024 14:10:01 -0500 Subject: [PATCH] Adds an ndjson input codec. This reads JSON objects for ND-JSON and more lenient formats that do not have the newline. (#4533) Signed-off-by: David Venable --- .../parse-json-processor/build.gradle | 1 + .../plugins/codec/json/NdjsonInputCodec.java | 73 ++++ .../plugins/codec/json/NdjsonInputConfig.java | 24 ++ .../codec/json/NdjsonInputCodecTest.java | 315 ++++++++++++++++++ 4 files changed, 413 insertions(+) create mode 100644 data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputCodec.java create mode 100644 data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputConfig.java create mode 100644 data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputCodecTest.java diff --git a/data-prepper-plugins/parse-json-processor/build.gradle b/data-prepper-plugins/parse-json-processor/build.gradle index ae02275bfe..44959173ba 100644 --- a/data-prepper-plugins/parse-json-processor/build.gradle +++ b/data-prepper-plugins/parse-json-processor/build.gradle @@ -15,6 +15,7 @@ dependencies { implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' implementation 'org.apache.parquet:parquet-common:1.14.0' testImplementation project(':data-prepper-test-common') + testImplementation project(':data-prepper-test-event') } test { diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputCodec.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputCodec.java new file mode 100644 index 0000000000..2042139974 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputCodec.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.codec.json; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.codec.InputCodec; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.event.LogEventBuilder; +import org.opensearch.dataprepper.model.log.Log; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; + +/** + * A Data Prepper {@link InputCodec} which reads ND-JSON and other similar + * formats which have JSON objects together. + */ +@DataPrepperPlugin(name = "ndjson", pluginType = InputCodec.class, pluginConfigurationType = NdjsonInputConfig.class) +public class NdjsonInputCodec implements InputCodec { + private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference<>() {}; + private final ObjectMapper objectMapper = new ObjectMapper(); + private final NdjsonInputConfig ndjsonInputConfig; + private final EventFactory eventFactory; + private final JsonFactory jsonFactory; + + @DataPrepperPluginConstructor + public NdjsonInputCodec(final NdjsonInputConfig ndjsonInputConfig, final EventFactory eventFactory) { + this.ndjsonInputConfig = ndjsonInputConfig; + this.eventFactory = eventFactory; + jsonFactory = new JsonFactory(); + } + + @Override + public void parse(final InputStream inputStream, final Consumer> eventConsumer) throws IOException { + Objects.requireNonNull(inputStream, "Parameter inputStream must not be null."); + Objects.requireNonNull(eventConsumer, "Parameter eventConsumer must not be null."); + + final JsonParser parser = jsonFactory.createParser(inputStream); + + final MappingIterator> mapMappingIterator = objectMapper.readValues(parser, MAP_TYPE_REFERENCE); + while (mapMappingIterator.hasNext()) { + final Map json = mapMappingIterator.next(); + + if(!ndjsonInputConfig.isIncludeEmptyObjects() && json.isEmpty()) + continue; + + final Record record = createRecord(json); + eventConsumer.accept(record); + } + } + + private Record createRecord(final Map json) { + final Log event = eventFactory.eventBuilder(LogEventBuilder.class) + .withData(json) + .build(); + + return new Record<>(event); + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputConfig.java b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputConfig.java new file mode 100644 index 0000000000..2fb903873b --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/main/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputConfig.java @@ -0,0 +1,24 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.codec.json; + +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Configuration for the {@link NdjsonInputCodec} input codec. + */ +public class NdjsonInputConfig { + /** + * By default, we will not create events for empty objects. However, we will + * permit users to include them if they desire. + */ + @JsonProperty("include_empty_objects") + private boolean includeEmptyObjects = false; + + public boolean isIncludeEmptyObjects() { + return includeEmptyObjects; + } +} diff --git a/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputCodecTest.java b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputCodecTest.java new file mode 100644 index 0000000000..ce113e19d8 --- /dev/null +++ b/data-prepper-plugins/parse-json-processor/src/test/java/org/opensearch/dataprepper/plugins/codec/json/NdjsonInputCodecTest.java @@ -0,0 +1,315 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.codec.json; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.event.TestEventFactory; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.record.Record; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class NdjsonInputCodecTest { + @Mock + private NdjsonInputConfig config; + + private EventFactory eventFactory; + + @Mock + private Consumer> eventConsumer; + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + @BeforeEach + void setUp() { + eventFactory = TestEventFactory.getTestEventFactory(); + } + + private NdjsonInputCodec createObjectUnderTest() { + return new NdjsonInputCodec(config, eventFactory); + } + + @Test + void parse_with_null_InputStream_throws() { + final NdjsonInputCodec objectUnderTest = createObjectUnderTest(); + + assertThrows(NullPointerException.class, () -> + objectUnderTest.parse(null, eventConsumer)); + + verifyNoInteractions(eventConsumer); + } + + @Test + void parse_with_InputStream_null_Consumer_throws() { + final NdjsonInputCodec objectUnderTest = createObjectUnderTest(); + + final InputStream inputStream = mock(InputStream.class); + assertThrows(NullPointerException.class, () -> + objectUnderTest.parse(inputStream, null)); + + verifyNoInteractions(inputStream); + } + + @Test + void parse_with_empty_InputStream_does_not_call_Consumer() throws IOException { + final ByteArrayInputStream emptyInputStream = new ByteArrayInputStream(new byte[]{}); + + createObjectUnderTest().parse(emptyInputStream, eventConsumer); + + verifyNoInteractions(eventConsumer); + } + + @ParameterizedTest + @ArgumentsSource(ValidInputStreamFormatsArgumentsProvider.class) + void parse_includes_objects_from_single_line_of_JSON_objects(final InputStreamFormat inputStreamFormat, final int numberOfObjects) throws IOException { + final List> objects = IntStream.range(0, numberOfObjects) + .mapToObj(i -> generateJson()) + .collect(Collectors.toList()); + + final InputStream inputStream = inputStreamFormat.createInputStream(objects); + + createObjectUnderTest().parse(inputStream, eventConsumer); + + final ArgumentCaptor> eventConsumerCaptor = ArgumentCaptor.forClass(Record.class); + + verify(eventConsumer, times(numberOfObjects)).accept(eventConsumerCaptor.capture()); + + final List> capturedRecords = eventConsumerCaptor.getAllValues(); + + for (int i = 0; i < numberOfObjects; i++) { + final Map expectedObject = objects.get(i); + final Record actualRecord = capturedRecords.get(i); + assertThat(actualRecord, notNullValue()); + final Event actualEvent = actualRecord.getData(); + assertThat(actualEvent, notNullValue()); + + final Map actualEventMap = actualEvent.toMap(); + assertThat(actualEventMap, equalTo(expectedObject)); + } + } + + @ParameterizedTest + @ArgumentsSource(ValidInputStreamFormatsArgumentsProvider.class) + void parse_excludes_empty_objects(final InputStreamFormat inputStreamFormat, final int numberOfObjects) throws IOException { + final List> objects = new ArrayList<>(); + final List> expectedObjects = new ArrayList<>(); + for (int i = 0; i < numberOfObjects; i++) { + final Map emptyJson = Collections.emptyMap(); + final Map json = generateJson(); + objects.add(emptyJson); + objects.add(json); + expectedObjects.add(json); + } + + final InputStream inputStream = inputStreamFormat.createInputStream(objects); + + createObjectUnderTest().parse(inputStream, eventConsumer); + + final ArgumentCaptor> eventConsumerCaptor = ArgumentCaptor.forClass(Record.class); + + verify(eventConsumer, times(numberOfObjects)).accept(eventConsumerCaptor.capture()); + + final List> capturedRecords = eventConsumerCaptor.getAllValues(); + + for (int i = 0; i < numberOfObjects; i++) { + final Map expectedObject = expectedObjects.get(i); + final Record actualRecord = capturedRecords.get(i); + assertThat(actualRecord, notNullValue()); + final Event actualEvent = actualRecord.getData(); + assertThat(actualEvent, notNullValue()); + + final Map actualEventMap = actualEvent.toMap(); + assertThat(actualEventMap, equalTo(expectedObject)); + } + } + + @ParameterizedTest + @ArgumentsSource(ValidInputStreamFormatsArgumentsProvider.class) + void parse_includes_empty_objects_when_configured(final InputStreamFormat inputStreamFormat, final int numberOfObjects) throws IOException { + final List> objects = new ArrayList<>(); + for (int i = 0; i < numberOfObjects; i++) { + final Map emptyJson = Collections.emptyMap(); + final Map json = generateJson(); + objects.add(emptyJson); + objects.add(json); + } + + final InputStream inputStream = inputStreamFormat.createInputStream(objects); + + when(config.isIncludeEmptyObjects()).thenReturn(true); + createObjectUnderTest().parse(inputStream, eventConsumer); + + final ArgumentCaptor> eventConsumerCaptor = ArgumentCaptor.forClass(Record.class); + + verify(eventConsumer, times(objects.size())).accept(eventConsumerCaptor.capture()); + + final List> capturedRecords = eventConsumerCaptor.getAllValues(); + + for (int i = 0; i < numberOfObjects; i++) { + final Map expectedObject = objects.get(2*i+1); + final Record expectedEmptyRecord = capturedRecords.get(2*i); + assertThat(expectedEmptyRecord, notNullValue()); + assertThat(expectedEmptyRecord.getData(), notNullValue()); + assertThat(expectedEmptyRecord.getData().toMap().size(), equalTo(0)); + + final Record actualRecord = capturedRecords.get(2*i+1); + assertThat(actualRecord, notNullValue()); + final Event actualEvent = actualRecord.getData(); + assertThat(actualEvent, notNullValue()); + + final Map actualEventMap = actualEvent.toMap(); + assertThat(actualEventMap, equalTo(expectedObject)); + } + } + + static class ValidInputStreamFormatsArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext extensionContext) throws Exception { + return Stream.of( + arguments(new StrictNdJsonInputStreamFormat(), 1), + arguments(new StrictNdJsonInputStreamFormat(), 2), + arguments(new StrictNdJsonInputStreamFormat(), 10), + arguments(new AllObjectsOnSameLineInputStreamFormat(), 1), + arguments(new AllObjectsOnSameLineInputStreamFormat(), 2), + arguments(new AllObjectsOnSameLineInputStreamFormat(), 10), + arguments(new AllObjectsOnSameLineWithSpacesInputStreamFormat(), 10), + arguments(new MixedInputStreamFormat(), 3), + arguments(new MixedInputStreamFormat(), 10) + ); + } + } + + interface InputStreamFormat { + InputStream createInputStream(final List> jsonObjects) throws JsonProcessingException; + } + + static class StrictNdJsonInputStreamFormat implements InputStreamFormat { + @Override + public InputStream createInputStream(final List> jsonObjects) throws JsonProcessingException { + final StringWriter writer = new StringWriter(); + + for (final Map jsonObject : jsonObjects) { + writer.append(OBJECT_MAPPER.writeValueAsString(jsonObject)); + writer.append(System.lineSeparator()); + } + + return new ByteArrayInputStream(writer.toString().getBytes()); + } + + @Override + public String toString() { + return "Strict ND-JSON"; + } + } + + static class AllObjectsOnSameLineInputStreamFormat implements InputStreamFormat { + @Override + public InputStream createInputStream(final List> jsonObjects) throws JsonProcessingException { + final StringWriter writer = new StringWriter(); + + for (final Map jsonObject : jsonObjects) { + writer.append(OBJECT_MAPPER.writeValueAsString(jsonObject)); + } + + return new ByteArrayInputStream(writer.toString().getBytes()); + } + + @Override + public String toString() { + return "Single line"; + } + } + + static class AllObjectsOnSameLineWithSpacesInputStreamFormat implements InputStreamFormat { + @Override + public InputStream createInputStream(final List> jsonObjects) throws JsonProcessingException { + final StringWriter writer = new StringWriter(); + + for (final Map jsonObject : jsonObjects) { + writer.append(OBJECT_MAPPER.writeValueAsString(jsonObject)); + writer.append(" "); + } + + return new ByteArrayInputStream(writer.toString().getBytes()); + } + + @Override + public String toString() { + return "Spaces"; + } + } + + static class MixedInputStreamFormat implements InputStreamFormat { + @Override + public InputStream createInputStream(final List> jsonObjects) throws JsonProcessingException { + final StringWriter writer = new StringWriter(); + + int counter = 0; + for (final Map jsonObject : jsonObjects) { + writer.append(OBJECT_MAPPER.writeValueAsString(jsonObject)); + if(counter % 2 == 0) + writer.append(System.lineSeparator()); + counter++; + } + + return new ByteArrayInputStream(writer.toString().getBytes()); + } + + @Override + public String toString() { + return "Mixed"; + } + } + + private static Map generateJson() { + final Map jsonObject = new LinkedHashMap<>(); + for (int i = 0; i < 1; i++) { + jsonObject.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + jsonObject.put(UUID.randomUUID().toString(), Arrays.asList(UUID.randomUUID().toString(), UUID.randomUUID().toString(), UUID.randomUUID().toString())); + + return jsonObject; + } +} \ No newline at end of file