Skip to content

Commit

Permalink
NIFI-10887: Addressed performance concerned. Use String.indexOf() ins…
Browse files Browse the repository at this point in the history
…tead of Pattern.matcher() when using Literal Replace. Use a NonFlushableOutputStream when ProcessSession.write() is called. Implemented hashCode() on AbstractConnection. Updated default Run Schedule on ReplaceText from 0 ms to 25 ms. Added a Surround Replacement strategy that allows both prepending and appending text. Updated unit tests to account for this.

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #6724
  • Loading branch information
markap14 authored and mattyb149 committed Jan 11, 2023
1 parent a9baa21 commit d5c79fd
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2989,7 +2989,8 @@ public OutputStream write(FlowFile source) {
ensureNotAppending(newClaim);

final OutputStream rawStream = claimCache.write(newClaim);
final OutputStream disableOnClose = new DisableOnCloseOutputStream(rawStream);
final OutputStream nonFlushable = new NonFlushableOutputStream(rawStream);
final OutputStream disableOnClose = new DisableOnCloseOutputStream(nonFlushable);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose);

final FlowFile sourceFlowFile = source;
Expand Down Expand Up @@ -3125,7 +3126,8 @@ public FlowFile write(FlowFile source, final OutputStreamCallback writer) {

ensureNotAppending(newClaim);
try (final OutputStream stream = claimCache.write(newClaim);
final OutputStream disableOnClose = new DisableOnCloseOutputStream(stream);
final NonFlushableOutputStream nonFlushableOutputStream = new NonFlushableOutputStream(stream);
final OutputStream disableOnClose = new DisableOnCloseOutputStream(nonFlushableOutputStream);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnClose)) {
try {
writeRecursionSet.add(source);
Expand Down Expand Up @@ -3417,7 +3419,8 @@ public FlowFile write(FlowFile source, final StreamCallback writer) {
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
final ByteCountingInputStream countingIn = new ByteCountingInputStream(disableOnCloseIn, bytesRead);
final OutputStream os = claimCache.write(newClaim);
final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(os);
final OutputStream nonFlushableOut = new NonFlushableOutputStream(os);
final OutputStream disableOnCloseOut = new DisableOnCloseOutputStream(nonFlushableOut);
final ByteCountingOutputStream countingOut = new ByteCountingOutputStream(disableOnCloseOut)) {

writeRecursionSet.add(source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,15 @@ private static Map<PropertyDescriptor, String> resolvePropertyValues(final Compo
public PropertyValue getProperty(final PropertyDescriptor property) {
final String configuredValue = properties.get(property);

// We need to get the 'canonical representation' of the property descriptor from the component itself,
// since the supplied PropertyDescriptor may not have the proper default value.
final PropertyDescriptor resolvedDescriptor = component.getPropertyDescriptor(property.getName());
final String resolvedValue = (configuredValue == null) ? resolvedDescriptor.getDefaultValue() : configuredValue;
final String resolvedValue;
if (configuredValue == null) {
// We need to get the 'canonical representation' of the property descriptor from the component itself,
// since the supplied PropertyDescriptor may not have the proper default value.
final PropertyDescriptor resolvedDescriptor = component.getPropertyDescriptor(property.getName());
resolvedValue = resolvedDescriptor.getDefaultValue();
} else {
resolvedValue = configuredValue;
}

final ResourceContext resourceContext = new StandardResourceContext(new StandardResourceReferenceFactory(), property);
return new StandardPropertyValue(resourceContext, resolvedValue, serviceLookup, component.getParameterLookup(), preparedQueries.get(property), variableRegistry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,4 +500,9 @@ public List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredReco
public List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords) {
return poll(filter, expiredRecords, PollStrategy.UNPENALIZED_FLOWFILES);
}

@Override
public int hashCode() {
return identifier.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.commons.io.IOUtils;
import org.apache.commons.text.StringSubstitutor;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
Expand Down Expand Up @@ -74,11 +75,11 @@

@EventDriven
@SideEffectFree
@SupportsBatching
@SupportsBatching(defaultDuration = DefaultRunDuration.TWENTY_FIVE_MILLIS)
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"Text", "Regular Expression", "Update", "Change", "Replace", "Modify", "Regex"})
@CapabilityDescription("Updates the content of a FlowFile by evaluating a Regular Expression (regex) against it and replacing the section of "
+ "the content that matches the Regular Expression with some alternate value.")
@CapabilityDescription("Updates the content of a FlowFile by searching for some textual value in the FlowFile content (via Regular Expression/regex, or literal value) and replacing the " +
"section of the content that matches with some alternate value. It can also be used to append or prepend text to the contents of a FlowFile.")
@SystemResourceConsideration(resource = SystemResource.MEMORY)
public class ReplaceText extends AbstractProcessor {

Expand All @@ -94,6 +95,7 @@ public class ReplaceText extends AbstractProcessor {
public static final String ENTIRE_TEXT = "Entire text";
public static final String prependValue = "Prepend";
public static final String appendValue = "Append";
public static final String surroundValue = "Surround";
public static final String regexReplaceValue = "Regex Replace";
public static final String literalReplaceValue = "Literal Replace";
public static final String alwaysReplace = "Always Replace";
Expand All @@ -114,6 +116,9 @@ public class ReplaceText extends AbstractProcessor {
+ "the value will be appended to each line. Similarly, for \"First-Line\", \"Last-Line\", \"Except-Last-Line\" and \"Except-First-Line\" Evaluation Modes,"
+ "the value will be appended to header alone, footer alone, all lines except header and all lines except footer respectively. For \"Entire Text\" evaluation mode,"
+ "the value will be appended to the entire text.");
static final AllowableValue SURROUND = new AllowableValue(surroundValue, surroundValue,
"Prepends text before the start of the FlowFile (or the start of each line, depending on the configuration of the Evaluation Mode property) " +
"as well as appending text to the end of the FlowFile (or the end of each line, depending on the configuration of the Evaluation Mode property)");
static final AllowableValue LITERAL_REPLACE = new AllowableValue(literalReplaceValue, literalReplaceValue,
"Search for all instances of the Search Value and replace the matches with the Replacement Value.");
static final AllowableValue REGEX_REPLACE = new AllowableValue(regexReplaceValue, regexReplaceValue,
Expand All @@ -127,13 +132,22 @@ public class ReplaceText extends AbstractProcessor {
"Substitute variable references (specified in ${var} form) using FlowFile attributes for looking up the replacement value by variable name. "
+ "When this strategy is chosen, both the <Search Value> and <Replacement Value> properties are ignored.");


public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
.name("Replacement Strategy")
.description("The strategy for how and what to replace within the FlowFile's text content.")
.allowableValues(PREPEND, APPEND, SURROUND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
.defaultValue(REGEX_REPLACE.getValue())
.required(true)
.build();
public static final PropertyDescriptor SEARCH_VALUE = new PropertyDescriptor.Builder()
.name("Regular Expression")
.displayName("Search Value")
.description("The Search Value to search for in the FlowFile content. Only used for 'Literal Replace' and 'Regex Replace' matching strategies")
.required(true)
.addValidator(Validator.VALID)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.dependsOn(REPLACEMENT_STRATEGY, REGEX_REPLACE, LITERAL_REPLACE)
.defaultValue(DEFAULT_REGEX)
.build();
public static final PropertyDescriptor REPLACEMENT_VALUE = new PropertyDescriptor.Builder()
Expand All @@ -145,6 +159,25 @@ public class ReplaceText extends AbstractProcessor {
.required(true)
.defaultValue(DEFAULT_REPLACEMENT_VALUE)
.addValidator(Validator.VALID)
.dependsOn(REPLACEMENT_STRATEGY, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, PREPEND, APPEND)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor PREPEND_TEXT = new PropertyDescriptor.Builder()
.name("Text to Prepend")
.displayName("Text to Prepend")
.description("The text to prepend to the start of the FlowFile, or each line, depending on teh configured value of the Evaluation Mode property")
.required(true)
.addValidator(Validator.VALID)
.dependsOn(REPLACEMENT_STRATEGY, SURROUND)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor APPEND_TEXT = new PropertyDescriptor.Builder()
.name("Text to Append")
.displayName("Text to Append")
.description("The text to append to the end of the FlowFile, or each line, depending on teh configured value of the Evaluation Mode property")
.required(true)
.addValidator(Validator.VALID)
.dependsOn(REPLACEMENT_STRATEGY, SURROUND)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder()
Expand All @@ -166,13 +199,6 @@ public class ReplaceText extends AbstractProcessor {
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
public static final PropertyDescriptor REPLACEMENT_STRATEGY = new PropertyDescriptor.Builder()
.name("Replacement Strategy")
.description("The strategy for how and what to replace within the FlowFile's text content.")
.allowableValues(PREPEND, APPEND, REGEX_REPLACE, LITERAL_REPLACE, ALWAYS_REPLACE, SUBSTITUTE_VARIABLES)
.defaultValue(REGEX_REPLACE.getValue())
.required(true)
.build();
public static final PropertyDescriptor EVALUATION_MODE = new PropertyDescriptor.Builder()
.name("Evaluation Mode")
.description("Run the 'Replacement Strategy' against each line separately (Line-by-Line) or buffer the entire file "
Expand All @@ -191,6 +217,8 @@ public class ReplaceText extends AbstractProcessor {
.required(false)
.build();



// Relationships
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
Expand All @@ -209,11 +237,13 @@ public class ReplaceText extends AbstractProcessor {
@Override
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(REPLACEMENT_STRATEGY);
properties.add(SEARCH_VALUE);
properties.add(REPLACEMENT_VALUE);
properties.add(PREPEND_TEXT);
properties.add(APPEND_TEXT);
properties.add(CHARACTER_SET);
properties.add(MAX_BUFFER_SIZE);
properties.add(REPLACEMENT_STRATEGY);
properties.add(EVALUATION_MODE);
properties.add(LINE_BY_LINE_EVALUATION_MODE);
this.properties = Collections.unmodifiableList(properties);
Expand Down Expand Up @@ -271,7 +301,10 @@ public void setup(ProcessContext context) {
replacementStrategyExecutor = new PrependReplace();
break;
case appendValue:
replacementStrategyExecutor = new AppendReplace();
replacementStrategyExecutor = new SurroundReplace(null, REPLACEMENT_VALUE);
break;
case surroundValue:
replacementStrategyExecutor = new SurroundReplace(PREPEND_TEXT, APPEND_TEXT);
break;
case regexReplaceValue:
// for backward compatibility - if replacement regex is ".*" then we will simply always replace the content.
Expand Down Expand Up @@ -454,23 +487,39 @@ public boolean isAllDataBufferedForEntireText() {

}

private static class AppendReplace implements ReplacementStrategyExecutor {
private static class SurroundReplace implements ReplacementStrategyExecutor {
private final PropertyDescriptor prependValueDescriptor;
private final PropertyDescriptor appendValueDescriptor;

public SurroundReplace(final PropertyDescriptor prependValueDescriptor, final PropertyDescriptor appendValueDescriptor) {
this.prependValueDescriptor = prependValueDescriptor;
this.appendValueDescriptor = appendValueDescriptor;
}

@Override
public FlowFile replace(FlowFile flowFile, final ProcessSession session, final ProcessContext context, final String evaluateMode, final Charset charset, final int maxBufferSize) {
final String replacementValue = context.getProperty(REPLACEMENT_VALUE).evaluateAttributeExpressions(flowFile).getValue();
final String prependValue = (prependValueDescriptor == null) ? null : context.getProperty(prependValueDescriptor).evaluateAttributeExpressions(flowFile).getValue();
final String appendValue = context.getProperty(appendValueDescriptor).evaluateAttributeExpressions(flowFile).getValue();

if (evaluateMode.equalsIgnoreCase(ENTIRE_TEXT)) {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream in, final OutputStream out) throws IOException {
if (prependValue != null && !prependValue.isEmpty()) {
out.write(prependValue.getBytes(charset));
}

IOUtils.copy(in, out);
out.write(replacementValue.getBytes(charset));
out.write(appendValue.getBytes(charset));
}
});
} else {
flowFile = session.write(flowFile, new StreamReplaceCallback(charset, maxBufferSize, context.getProperty(LINE_BY_LINE_EVALUATION_MODE).getValue(),
(bw, oneLine) -> {
if (prependValue != null && !prependValue.isEmpty()) {
bw.write(prependValue);
}

// we need to find the first carriage return or new-line so that we can append the new value
// before the line separate. However, we don't want to do this using a regular expression due
// to performance concerns. So we will find the first occurrence of either \r or \n and use
Expand All @@ -484,15 +533,15 @@ public void process(final InputStream in, final OutputStream out) throws IOExcep
}

if (c == '\r' || c == '\n') {
bw.write(replacementValue);
bw.write(appendValue);
foundNewLine = true;
}

bw.write(c);
}

if (!foundNewLine) {
bw.write(replacementValue);
bw.write(appendValue);
}
}));
}
Expand Down Expand Up @@ -641,13 +690,14 @@ public void process(final InputStream in, final OutputStream out) throws IOExcep
int matches = 0;
int lastEnd = 0;

final Matcher matcher = searchPattern.matcher(oneLine);
while (matcher.find()) {
bw.write(oneLine, lastEnd, matcher.start() - lastEnd);
int index = oneLine.indexOf(searchValue, lastEnd);
while (index >= 0) {
bw.write(oneLine, lastEnd, index - lastEnd);
bw.write(replacementValue);
matches++;

lastEnd = matcher.end();
lastEnd = index + searchValue.length();
index = oneLine.indexOf(searchValue, lastEnd);
}

if (matches > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1631,6 +1631,41 @@ public void testProcessorConfigurationRegexNotValid() {
runner.assertValid();
}

@Test
public void testSurroundWithEntireText() {
final TestRunner runner = getRunner();
runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.SURROUND);
runner.setProperty(ReplaceText.PREPEND_TEXT, "<pre>");
runner.setProperty(ReplaceText.APPEND_TEXT, "<post>");
runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.ENTIRE_TEXT);

final String input = "Hello\nThere\nHow are you\nToday?";
runner.enqueue(input);
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);

final MockFlowFile output = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
output.assertContentEquals("<pre>" + input + "<post>");
}

@Test
public void testSurroundLineByLine() {
final TestRunner runner = getRunner();
runner.setProperty(ReplaceText.REPLACEMENT_STRATEGY, ReplaceText.SURROUND);
runner.setProperty(ReplaceText.PREPEND_TEXT, "<pre>");
runner.setProperty(ReplaceText.APPEND_TEXT, "<post>");
runner.setProperty(ReplaceText.EVALUATION_MODE, ReplaceText.LINE_BY_LINE);

final String input = "Hello\nThere\nHow are you\nToday?";
runner.enqueue(input);
runner.run();
runner.assertAllFlowFilesTransferred(ReplaceText.REL_SUCCESS, 1);

final MockFlowFile output = runner.getFlowFilesForRelationship(ReplaceText.REL_SUCCESS).get(0);
output.assertContentEquals("<pre>Hello<post>\n<pre>There<post>\n<pre>How are you<post>\n<pre>Today?<post>");
}


@Test
public void testBackReferenceEscapeWithRegexReplaceUsingEL() {
final TestRunner runner = getRunner();
Expand Down

0 comments on commit d5c79fd

Please sign in to comment.