diff --git a/PROCESSORS.md b/PROCESSORS.md index 293c317e41..50e9f5acbe 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -94,6 +94,7 @@ limitations under the License. - [RetryFlowFile](#RetryFlowFile) - [RouteOnAttribute](#RouteOnAttribute) - [RouteText](#RouteText) +- [SplitContent](#SplitContent) - [SplitText](#SplitText) - [TailEventLog](#TailEventLog) - [TailFile](#TailFile) @@ -2833,6 +2834,40 @@ In the list below, the names of required properties appear in bold. Any other pr | success | All files, containing log events, are routed to success | +## SplitContent + +### Description + +Splits incoming FlowFiles by a specified byte sequence + +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +|----------------------------|---------------|----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Byte Sequence Format** | Hexadecimal | Hexadecimal
Text | Specifies how the property should be interpreted | +| **Byte Sequence** | | | A representation of bytes to look for and upon which to split the source file into separate files | +| **Keep Byte Sequence** | false | true
false | Determines whether or not the Byte Sequence should be included with each Split | +| **Byte Sequence Location** | Trailing | Trailing
Leading | If is set to true, specifies whether the byte sequence should be added to the end of the first split or the beginning of the second; if is false, this property is ignored. | + +### Relationships + +| Name | Description | +|----------|------------------------------------------------------| +| original | The original file | +| splits | All Splits will be routed to the splits relationship | + +### Output Attributes + +| Attribute | Relationship | Description | +|---------------------------|--------------|--------------------------------------------------------------------------------------------------------------------------------| +| fragment.identifier | | All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute | +| fragment.index | | A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile | +| fragment.count | | The number of split FlowFiles generated from the parent FlowFile | +| segment.original.filename | | The filename of the parent FlowFile | + + ## SplitText ### Description diff --git a/README.md b/README.md index f325315e09..eeeea95970 100644 --- a/README.md +++ b/README.md @@ -63,9 +63,9 @@ MiNiFi - C++ supports the following C++ processors: The following table lists the base set of processors. -| Extension Set | Processors | -|---------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)
[AttributesToJSON](PROCESSORS.md#attributestojson)
[DefragmentText](PROCESSORS.md#defragmenttext)
[ExecuteProcess](PROCESSORS.md#executeprocess)
[ExtractText](PROCESSORS.md#extracttext)
[FetchFile](PROCESSORS.md#fetchfile)
[GenerateFlowFile](PROCESSORS.md#generateflowfile)
[GetFile](PROCESSORS.md#getfile)
[GetTCP](PROCESSORS.md#gettcp)
[HashContent](PROCESSORS.md#hashcontent)
[InvokeHTTP](PROCESSORS.md#invokehttp)
[ListenSyslog](PROCESSORS.md#listensyslog)
[ListenTCP](PROCESSORS.md#listentcp)
[ListenUDP](PROCESSORS.md#listenudp)
[ListFile](PROCESSORS.md#listfile)
[LogAttribute](PROCESSORS.md#logattribute)
[PutFile](PROCESSORS.md#putfile)
[PutTCP](PROCESSORS.md#puttcp)
[PutUDP](PROCESSORS.md#putudp)
[ReplaceText](PROCESSORS.md#replacetext)
[RetryFlowFile](PROCESSORS.md#retryflowfile)
[RouteOnAttribute](PROCESSORS.md#routeonattribute)
[RouteText](PROCESSORS.md#routetext)
[SplitText](PROCESSORS.md#splittext)
[TailFile](PROCESSORS.md#tailfile)
[UpdateAttribute](PROCESSORS.md#updateattribute) | +| Extension Set | Processors | +|---------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)
[AttributesToJSON](PROCESSORS.md#attributestojson)
[DefragmentText](PROCESSORS.md#defragmenttext)
[ExecuteProcess](PROCESSORS.md#executeprocess)
[ExtractText](PROCESSORS.md#extracttext)
[FetchFile](PROCESSORS.md#fetchfile)
[GenerateFlowFile](PROCESSORS.md#generateflowfile)
[GetFile](PROCESSORS.md#getfile)
[GetTCP](PROCESSORS.md#gettcp)
[HashContent](PROCESSORS.md#hashcontent)
[InvokeHTTP](PROCESSORS.md#invokehttp)
[ListenSyslog](PROCESSORS.md#listensyslog)
[ListenTCP](PROCESSORS.md#listentcp)
[ListenUDP](PROCESSORS.md#listenudp)
[ListFile](PROCESSORS.md#listfile)
[LogAttribute](PROCESSORS.md#logattribute)
[PutFile](PROCESSORS.md#putfile)
[PutTCP](PROCESSORS.md#puttcp)
[PutUDP](PROCESSORS.md#putudp)
[ReplaceText](PROCESSORS.md#replacetext)
[RetryFlowFile](PROCESSORS.md#retryflowfile)
[RouteOnAttribute](PROCESSORS.md#routeonattribute)
[RouteText](PROCESSORS.md#routetext)
[SplitContent](PROCESSORS.md#splitcontent)
[SplitText](PROCESSORS.md#splittext)
[TailFile](PROCESSORS.md#tailfile)
[UpdateAttribute](PROCESSORS.md#updateattribute) | The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as RocksDB ), can be disabled with the respective CMAKE flag on the command line. diff --git a/extensions/standard-processors/processors/SplitContent.cpp b/extensions/standard-processors/processors/SplitContent.cpp new file mode 100644 index 0000000000..9801176b67 --- /dev/null +++ b/extensions/standard-processors/processors/SplitContent.cpp @@ -0,0 +1,168 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SplitContent.h" + +#include + +#include "core/ProcessContext.h" +#include "core/ProcessSession.h" +#include "core/Resource.h" +#include "core/FlowFile.h" +#include "utils/gsl.h" +#include "utils/ProcessorConfigUtils.h" + +namespace org::apache::nifi::minifi::processors { + +constexpr size_t BUFFER_TARGET_SIZE = 1024; + +void SplitContent::initialize() { + setSupportedProperties(Properties); + setSupportedRelationships(Relationships); +} + +void SplitContent::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { + auto byte_sequence_str = utils::getRequiredPropertyOrThrow(context, ByteSequence.name); + const auto byte_sequence_format = utils::parseEnumProperty(context, ByteSequenceFormatProperty); + if (byte_sequence_format == ByteSequenceFormat::Hexadecimal) { + byte_sequence_ = utils::string::from_hex(byte_sequence_str); + } else { + byte_sequence_.resize(byte_sequence_str.size()); + std::ranges::transform(byte_sequence_str, byte_sequence_.begin(), [] (char c) { return static_cast(c); }); + } + byte_sequence_location_ = utils::parseEnumProperty(context, ByteSequenceLocationProperty); + keep_byte_sequence = utils::getRequiredPropertyOrThrow(context, KeepByteSequence.name); +} + +std::shared_ptr SplitContent::createNewSplit(core::ProcessSession& session) const { + auto next_split = session.create(); + if (!next_split) { + throw Exception(PROCESSOR_EXCEPTION, "Couldn't create FlowFile"); + } + if (keep_byte_sequence && byte_sequence_location_ == ByteSequenceLocation::Leading) { + session.appendBuffer(next_split, byte_sequence_); + } + return next_split; +} + +void SplitContent::finalizeLatestSplitContent(core::ProcessSession& session, const std::shared_ptr& latest_split, const std::vector& buffer) const { + const std::span data_without_byte_sequence{buffer.data(), buffer.size() - byte_sequence_.size()}; + session.appendBuffer(latest_split, data_without_byte_sequence); + if (keep_byte_sequence && byte_sequence_location_ == ByteSequenceLocation::Trailing) { + session.appendBuffer(latest_split, byte_sequence_); + } +} + +void SplitContent::finalizeLastSplitContent(core::ProcessSession& session, + std::vector>& splits, + const std::vector& buffer, + const bool ended_with_byte_sequence) const { + if (ended_with_byte_sequence && splits.back()->getSize() != 0) { + if (keep_byte_sequence && byte_sequence_location_ == ByteSequenceLocation::Leading) { + const auto last_split = session.create(); + if (!last_split) { + throw Exception(PROCESSOR_EXCEPTION, "Couldn't create FlowFile"); + } + splits.push_back(last_split); + session.appendBuffer(splits.back(), byte_sequence_); + } + } else { + session.appendBuffer(splits.back(), buffer); + } +} + +namespace { +std::shared_ptr createFirstSplit(core::ProcessSession& session) { + auto first_split = session.create(); + if (!first_split) { + throw Exception(PROCESSOR_EXCEPTION, "Couldn't create FlowFile"); + } + return first_split; +} + +void updateSplitAttributesAndTransfer(core::ProcessSession& session, const std::vector>& splits, const core::FlowFile& original) { + const std::string fragment_identifier_ = utils::IdGenerator::getIdGenerator()->generate().to_string(); + for (size_t split_i = 0; split_i < splits.size(); ++split_i) { + const auto& split = splits[split_i]; + split->setAttribute(SplitContent::FragmentCountOutputAttribute.name, std::to_string(splits.size())); + split->setAttribute(SplitContent::FragmentIndexOutputAttribute.name, std::to_string(split_i + 1)); // One based indexing + split->setAttribute(SplitContent::FragmentIdentifierOutputAttribute.name, fragment_identifier_); + split->setAttribute(SplitContent::SegmentOriginalFilenameOutputAttribute.name, original.getAttribute(core::SpecialFlowAttribute::FILENAME).value_or("")); + session.transfer(split, SplitContent::Splits); + } +} + +bool lastSplitIsEmpty(const std::vector>& splits) { + return splits.back()->getSize() != 0; +} +} // namespace + +void SplitContent::onTrigger(core::ProcessContext& context, core::ProcessSession& session) { + gsl_Assert(!byte_sequence_.empty()); + const auto original = session.get(); + if (!original) { + context.yield(); + return; + } + + const auto ff_content_stream = session.getFlowFileContentStream(*original); + if (!ff_content_stream) { + throw Exception(PROCESSOR_EXCEPTION, fmt::format("Couldn't access the ContentStream of {}", original->getUUID().to_string())); + } + std::vector buffer{}; + buffer.reserve(BUFFER_TARGET_SIZE + byte_sequence_.size()); + size_t matching_bytes = 0; + bool ended_with_byte_sequence = false; + std::vector> splits{}; + splits.push_back(createFirstSplit(session)); + + while (auto latest_byte = ff_content_stream->readByte()) { + buffer.push_back(*latest_byte); + if (ended_with_byte_sequence) { + ended_with_byte_sequence = false; + if (lastSplitIsEmpty(splits)) { + splits.push_back(createNewSplit(session)); + } + } + if (latest_byte == byte_sequence_[matching_bytes]) { + matching_bytes++; + if (matching_bytes == byte_sequence_.size()) { + // Found the Byte Sequence + finalizeLatestSplitContent(session, splits.back(), buffer); + ended_with_byte_sequence = true; + matching_bytes = 0; + buffer.clear(); + } + } else { + matching_bytes = 0; + if (buffer.size() >= BUFFER_TARGET_SIZE) { + session.appendBuffer(splits.back(), buffer); + buffer.clear(); + } + } + } + + finalizeLastSplitContent(session, splits, buffer, ended_with_byte_sequence); + + updateSplitAttributesAndTransfer(session, splits, *original); + session.transfer(original, Original); +} + +REGISTER_RESOURCE(SplitContent, Processor); + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/processors/SplitContent.h b/extensions/standard-processors/processors/SplitContent.h new file mode 100644 index 0000000000..ed23f34898 --- /dev/null +++ b/extensions/standard-processors/processors/SplitContent.h @@ -0,0 +1,130 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include +#include +#include + +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/PropertyDefinitionBuilder.h" +#include "core/PropertyDefinition.h" +#include "core/PropertyType.h" +#include "core/RelationshipDefinition.h" +#include "FlowFileRecord.h" +#include "utils/Export.h" + +namespace org::apache::nifi::minifi::processors { + +class SplitContent final : public core::Processor { + public: + explicit SplitContent(const std::string_view name, const utils::Identifier& uuid = {}) + : Processor(name, uuid) { + } + + enum class ByteSequenceFormat { + Hexadecimal, + Text + }; + + enum class ByteSequenceLocation { + Trailing, + Leading + }; + + EXTENSIONAPI static constexpr auto Description = "Splits incoming FlowFiles by a specified byte sequence"; + + EXTENSIONAPI static constexpr auto ByteSequenceFormatProperty = core::PropertyDefinitionBuilder<2>::createProperty("Byte Sequence Format") + .withDescription("Specifies how the property should be interpreted") + .isRequired(true) + .withDefaultValue(magic_enum::enum_name(ByteSequenceFormat::Hexadecimal)) + .withAllowedValues(magic_enum::enum_names()) + .build(); + + EXTENSIONAPI static constexpr auto ByteSequence = core::PropertyDefinitionBuilder<>::createProperty("Byte Sequence") + .withDescription("A representation of bytes to look for and upon which to split the source file into separate files") + .isRequired(true) + .withPropertyType(core::StandardPropertyTypes::NON_BLANK_TYPE) + .build(); + + EXTENSIONAPI static constexpr auto KeepByteSequence = core::PropertyDefinitionBuilder<>::createProperty("Keep Byte Sequence") + .withDescription("Determines whether or not the Byte Sequence should be included with each Split") + .withPropertyType(core::StandardPropertyTypes::BOOLEAN_TYPE) + .withDefaultValue("false") + .isRequired(true) + .build(); + + EXTENSIONAPI static constexpr auto ByteSequenceLocationProperty = core::PropertyDefinitionBuilder<2>::createProperty("Byte Sequence Location") + .withDescription("If is set to true, specifies whether the byte sequence should be added to the end of the first split or the beginning of the second; " + "if is false, this property is ignored.") + .withDefaultValue(magic_enum::enum_name(ByteSequenceLocation::Trailing)) + .withAllowedValues(magic_enum::enum_names()) + .isRequired(true) + .build(); + + + EXTENSIONAPI static constexpr auto Properties = std::to_array({ + ByteSequenceFormatProperty, + ByteSequence, + KeepByteSequence, + ByteSequenceLocationProperty + }); + + EXTENSIONAPI static constexpr auto Splits = core::RelationshipDefinition{"splits", "All Splits will be routed to the splits relationship"}; + EXTENSIONAPI static constexpr auto Original = core::RelationshipDefinition{"original", "The original file"}; + EXTENSIONAPI static constexpr auto Relationships = std::array{Original, Splits}; + + EXTENSIONAPI static constexpr auto FragmentIdentifierOutputAttribute = + core::OutputAttributeDefinition<0>{"fragment.identifier", {}, "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"}; + EXTENSIONAPI static constexpr auto FragmentIndexOutputAttribute = + core::OutputAttributeDefinition<0>{"fragment.index", {}, "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"}; + EXTENSIONAPI static constexpr auto FragmentCountOutputAttribute = + core::OutputAttributeDefinition<0>{"fragment.count", {}, "The number of split FlowFiles generated from the parent FlowFile"}; + EXTENSIONAPI static constexpr auto SegmentOriginalFilenameOutputAttribute = + core::OutputAttributeDefinition<0>{"segment.original.filename", {}, "The filename of the parent FlowFile"}; + EXTENSIONAPI static constexpr auto OutputAttributes = std::array{ + FragmentIdentifierOutputAttribute, + FragmentIndexOutputAttribute, + FragmentCountOutputAttribute, + SegmentOriginalFilenameOutputAttribute + }; + + EXTENSIONAPI static constexpr bool SupportsDynamicProperties = true; + EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; + EXTENSIONAPI static constexpr auto InputRequirement = core::annotation::Input::INPUT_REQUIRED; + EXTENSIONAPI static constexpr bool IsSingleThreaded = false; + ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS + + void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; + void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; + void initialize() override; + + private: + std::shared_ptr createNewSplit(core::ProcessSession& session) const; + void finalizeLatestSplitContent(core::ProcessSession& session, const std::shared_ptr& latest_split, const std::vector& buffer) const; + void finalizeLastSplitContent(core::ProcessSession& session, std::vector>& splits, const std::vector& buffer, bool ended_with_byte_sequence) const; + + std::vector byte_sequence_{}; + bool keep_byte_sequence = false; + ByteSequenceLocation byte_sequence_location_ = ByteSequenceLocation::Trailing; + std::shared_ptr logger_ = core::logging::LoggerFactory::getLogger(uuid_); +}; + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/tests/unit/SplitContentTests.cpp b/extensions/standard-processors/tests/unit/SplitContentTests.cpp new file mode 100644 index 0000000000..8c98c5cbff --- /dev/null +++ b/extensions/standard-processors/tests/unit/SplitContentTests.cpp @@ -0,0 +1,381 @@ +/** +* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "FlowFileRecord.h" +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "processors/SplitContent.h" +#include "unit/SingleProcessorTestController.h" +#include "catch2/generators/catch_generators.hpp" + + +namespace org::apache::nifi::minifi::processors::test { + +template +std::vector createByteVector(Bytes... bytes) { + return {static_cast(bytes)...}; +} + +TEST_CASE("TextFormatLeadingPosition", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "ub"); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Leading)); + + auto trigger_results = controller.trigger("rub-a-dub-dub"); + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 4); + + CHECK(controller.plan->getContent(original[0]) == "rub-a-dub-dub"); + + CHECK(controller.plan->getContent(splits[0]) == "r"); + CHECK(controller.plan->getContent(splits[1]) == "ub-a-d"); + CHECK(controller.plan->getContent(splits[2]) == "ub-d"); + CHECK(controller.plan->getContent(splits[3]) == "ub"); +} + +TEST_CASE("TextFormatTrailingPosition", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "ub"); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing)); + + auto trigger_results = controller.trigger("rub-a-dub-dub"); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 3); + + CHECK(controller.plan->getContent(original[0]) == "rub-a-dub-dub"); + + CHECK(controller.plan->getContent(splits[0]) == "rub"); + CHECK(controller.plan->getContent(splits[1]) == "-a-dub"); + CHECK(controller.plan->getContent(splits[2]) == "-dub"); +} + +TEST_CASE("TextFormatSplits", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, "test"); + + constexpr std::string_view input_1 = "This is a test. This is another test. And this is yet another test. Finally this is the last Test."; + constexpr std::string_view input_2 = "This is a test. This is another test. And this is yet another test. Finally this is the last test"; + + const auto [keep_byte_sequence, byte_sequence_location, input, expected_splits] = GENERATE_REF( + std::make_tuple("true", "Leading", input_1, std::vector{"This is a ", "test. This is another ", "test. And this is yet another ", "test. Finally this is the last Test."}), + std::make_tuple("false", "Leading", input_1, std::vector{"This is a ", ". This is another ", ". And this is yet another ", ". Finally this is the last Test."}), + std::make_tuple("true", "Trailing", input_1, std::vector{"This is a test", ". This is another test", ". And this is yet another test", ". Finally this is the last Test."}), + std::make_tuple("false", "Trailing", input_1, std::vector{"This is a ", ". This is another ", ". And this is yet another ", ". Finally this is the last Test."}), + std::make_tuple("true", "Leading", input_2, std::vector{"This is a ", "test. This is another ", "test. And this is yet another ", "test. Finally this is the last ", "test"}), + std::make_tuple("true", "Trailing", input_2, std::vector{"This is a test", ". This is another test", ". And this is yet another test", ". Finally this is the last test"})); + + split_content->setProperty(SplitContent::KeepByteSequence, keep_byte_sequence); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, byte_sequence_location); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == expected_splits.size()); + + CHECK(controller.plan->getContent(original[0]) == input); + + + for (size_t i = 0; i < expected_splits.size(); ++i) { + auto split_i = controller.plan->getContent(splits[i]); + auto expected_i = expected_splits[i]; + CHECK(split_i == expected_i); + } +} + +TEST_CASE("SmallSplits", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequence, "FFFF"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 0xFF, 0xFF, 0xFF, 5, 4, 3, 2, 1); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + const auto expected_split_1 = createByteVector(1, 2, 3, 4, 5); + const auto expected_split_2 = createByteVector(0xFF, 5, 4, 3, 2, 1); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); + CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2); +} + +TEST_CASE("WithSingleByteSplit", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequence, "FF"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 0xFF, 5, 4, 3, 2, 1); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + const auto original = trigger_results.at(processors::SplitContent::Original); + const auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + const auto expected_split_1 = createByteVector(1, 2, 3, 4, 5); + const auto expected_split_2 = createByteVector(5, 4, 3, 2, 1); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); + CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2); +} + +TEST_CASE("WithLargerSplit", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 4, 3, 2, 1); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + const auto original = trigger_results.at(processors::SplitContent::Original); + const auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + const auto expected_split_1 = createByteVector(1, 2, 3, 4); + const auto expected_split_2 = createByteVector(5, 5, 4, 3, 2, 1); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); + CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2); +} + +TEST_CASE("KeepingSequence", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5, 5, 5, 4, 3, 2, 1); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + const auto original = trigger_results.at(processors::SplitContent::Original); + const auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + const auto expected_split_1 = createByteVector(1, 2, 3, 4, 5, 5, 5, 5); + const auto expected_split_2 = createByteVector(5, 5, 4, 3, 2, 1); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); + CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2); +} + +TEST_CASE("EndsWithSequence", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 1); + + auto expected_split = createByteVector(1, 2, 3, 4); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split); +} + +TEST_CASE("EndsWithSequenceAndKeepSequence", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + + const auto input_data = createByteVector(1, 2, 3, 4, 5, 5, 5, 5); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 1); + + auto expected_split_1 = createByteVector(1, 2, 3, 4, 5, 5, 5, 5); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); +} + +TEST_CASE("StartsWithSequence", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + + const auto input_data = createByteVector(5, 5, 5, 5, 1, 2, 3, 4); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 1); + + auto expected_split = createByteVector(1, 2, 3, 4); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split); +} + +TEST_CASE("StartsWithSequenceAndKeepSequence", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequence, "05050505"); + + const auto input_data = createByteVector(5, 5, 5, 5, 1, 2, 3, 4); + std::string_view input(reinterpret_cast(input_data.data()), input_data.size()); + + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(original.size() == 1); + REQUIRE(splits.size() == 2); + + auto expected_split_1 = createByteVector(5, 5, 5, 5); + auto expected_split_2 = createByteVector(1, 2, 3, 4); + + CHECK(controller.plan->getContentAsBytes(*splits[0]) == expected_split_1); + CHECK(controller.plan->getContentAsBytes(*splits[1]) == expected_split_2); +} + +TEST_CASE("NoSplitterInString", "[NiFi]") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, ","); + split_content->setProperty(SplitContent::KeepByteSequence, "false"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, magic_enum::enum_name(SplitContent::ByteSequenceLocation::Trailing)); + + constexpr std::string_view input = "UVAT"; + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(splits.size() == 1); + REQUIRE(original.size() == 1); + + CHECK(splits[0]->getAttribute("fragment.identifier").has_value()); + CHECK(splits[0]->getAttribute("segment.original.filename").has_value()); + + CHECK(splits[0]->getAttribute("fragment.count").value() == "1"); + CHECK(splits[0]->getAttribute("fragment.index").value() == "1"); + + CHECK(controller.plan->getContent(splits[0]) == input); + CHECK(controller.plan->getContent(original[0]) == input); +} + +TEST_CASE("ByteSequenceAtBufferTargetSize") { + const auto split_content = std::make_shared("SplitContent"); + minifi::test::SingleProcessorTestController controller{split_content}; + + auto [pre_fix_size, separator_size, post_fix_size] = GENERATE( + std::make_tuple(1020, 1020, 1020), + std::make_tuple(10, 10, 1020), + std::make_tuple(10, 1020, 10), + std::make_tuple(10, 10, 1020), + std::make_tuple(10, 1020, 1020), + std::make_tuple(1020, 10, 1020), + std::make_tuple(1020, 1020, 10), + std::make_tuple(2000, 1020, 10)); + + + const std::string pre_fix = utils::string::repeat("a", pre_fix_size); + const std::string separator = utils::string::repeat("b", separator_size); + const std::string post_fix = utils::string::repeat("c", post_fix_size); + + split_content->setProperty(SplitContent::ByteSequenceFormatProperty, magic_enum::enum_name(SplitContent::ByteSequenceFormat::Text)); + split_content->setProperty(SplitContent::ByteSequence, separator); + split_content->setProperty(SplitContent::KeepByteSequence, "true"); + split_content->setProperty(SplitContent::ByteSequenceLocationProperty, "Trailing"); + + auto input = pre_fix + separator + post_fix; + auto trigger_results = controller.trigger(input); + + auto original = trigger_results.at(processors::SplitContent::Original); + auto splits = trigger_results.at(processors::SplitContent::Splits); + + REQUIRE(splits.size() == 2); + REQUIRE(original.size() == 1); + + CHECK(controller.plan->getContent(splits[0]) == std::string(pre_fix) + std::string(separator)); + CHECK(controller.plan->getContent(splits[1]) == post_fix); + + CHECK(controller.plan->getContent(original[0]) == input); +} + +} // namespace org::apache::nifi::minifi::processors::test diff --git a/libminifi/include/io/InputStream.h b/libminifi/include/io/InputStream.h index f18c5655e0..ad24d6908d 100644 --- a/libminifi/include/io/InputStream.h +++ b/libminifi/include/io/InputStream.h @@ -81,6 +81,14 @@ class InputStream : public virtual Stream { return sizeof(Integral); } + + std::optional readByte() { + std::array buf{}; + if (read(buf) != 1) { + return std::nullopt; + } + return buf[0]; + } }; } // namespace org::apache::nifi::minifi::io diff --git a/libminifi/test/libtest/unit/TestBase.cpp b/libminifi/test/libtest/unit/TestBase.cpp index b83c1509c4..c254b38eaf 100644 --- a/libminifi/test/libtest/unit/TestBase.cpp +++ b/libminifi/test/libtest/unit/TestBase.cpp @@ -672,11 +672,20 @@ void TestPlan::validateAnnotations() const { } } +std::vector TestPlan::getContentAsBytes(const core::FlowFile& flow_file) const { + const auto content_claim = flow_file.getResourceClaim(); + const auto content_stream = content_repo_->read(*content_claim); + const auto output_stream = std::make_shared(); + std::ignore = minifi::InputStreamPipe{*output_stream}(content_stream); + auto content = output_stream->getBuffer().subspan(flow_file.getOffset(), flow_file.getSize()); + return ranges::to(content); +} + std::string TestPlan::getContent(const minifi::core::FlowFile& file) const { - auto content_claim = file.getResourceClaim(); - auto content_stream = content_repo_->read(*content_claim); - auto output_stream = std::make_shared(); - minifi::InputStreamPipe{*output_stream}(content_stream); + const auto content_claim = file.getResourceClaim(); + const auto content_stream = content_repo_->read(*content_claim); + const auto output_stream = std::make_shared(); + std::ignore = minifi::InputStreamPipe{*output_stream}(content_stream); return utils::span_to(minifi::utils::as_span(output_stream->getBuffer()).subspan(file.getOffset(), file.getSize())); } diff --git a/libminifi/test/libtest/unit/TestBase.h b/libminifi/test/libtest/unit/TestBase.h index 05826b2b65..94c90e9be1 100644 --- a/libminifi/test/libtest/unit/TestBase.h +++ b/libminifi/test/libtest/unit/TestBase.h @@ -285,6 +285,7 @@ class TestPlan { return state_storage_; } + std::vector getContentAsBytes(const core::FlowFile& flow_file) const; std::string getContent(const std::shared_ptr& file) const { return getContent(*file); } std::string getContent(const minifi::core::FlowFile& file) const;