Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-10449: Fix ScriptedLookupServices to reload after disabled and first validation after changes made #6371

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,10 @@ public void disableControllerService(final ControllerService service) {
}

try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service);
// Create a config context to pass into the controller service's OnDisabled method (it will be ignored if the controller service has no arguments)
final MockConfigurationContext configContext = new MockConfigurationContext(service, configuration.getProperties(), context, variableRegistry);
configContext.setValidateExpressions(validateExpressionUsage);
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service, configContext);
} catch (final Exception e) {
e.printStackTrace();
Assertions.fail("Failed to disable Controller Service " + service + " due to " + e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
Expand All @@ -35,6 +37,7 @@
import org.apache.nifi.script.AbstractScriptedControllerService;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.script.impl.FilteredPropertiesValidationContextAdapter;

import javax.script.Invocable;
import javax.script.ScriptContext;
Expand All @@ -46,6 +49,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -105,7 +109,7 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
}
} catch (final Throwable t) {
final ComponentLog logger = getLogger();
final String message = "Unable to get property descriptors from Processor: " + t;
final String message = "Unable to get property descriptors from LookupService: " + t;

logger.error(message);
if (logger.isDebugEnabled()) {
Expand Down Expand Up @@ -148,16 +152,29 @@ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String
*/
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
validationResults.set(new HashSet<>());

final ComponentLog logger = getLogger();
final ConfigurableComponent instance = lookupService.get();
final LookupService<?> instance = lookupService.get();

if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)
|| ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {

// Update the ScriptingComponentHelper's value(s)
if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)) {
scriptingComponentHelper.setScriptPath(newValue);
} else if (ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)) {
scriptingComponentHelper.setScriptBody(newValue);
} else if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptingComponentHelper.setScriptEngineName(newValue);
}

scriptNeedsReload.set(true);
scriptRunner = null; //reset engine. This happens only when the controller service is disabled, so there won't be any performance impact in run-time.
} else if (instance != null) {
// If the script provides a ConfigurableComponent, call its onPropertyModified() method
// If the script provides a LookupService, call its onPropertyModified() method
try {
instance.onPropertyModified(descriptor, oldValue, newValue);
} catch (final Exception e) {
Expand All @@ -167,6 +184,71 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
}
}

@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
Collection<ValidationResult> commonValidationResults = super.customValidate(context);
if (!commonValidationResults.isEmpty()) {
return commonValidationResults;
}

// do not try to build processor/compile/etc until onPropertyModified clear the validation error/s
// and don't print anything into log.
if (!validationResults.get().isEmpty()) {
return validationResults.get();
}

Collection<ValidationResult> scriptingComponentHelperResults = scriptingComponentHelper.customValidate(context);
if (scriptingComponentHelperResults != null && !scriptingComponentHelperResults.isEmpty()) {
validationResults.set(scriptingComponentHelperResults);
return scriptingComponentHelperResults;
}

scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue());
scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue());
scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue());
final ResourceReferences resourceReferences = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().asResources();
scriptingComponentHelper.setModules(resourceReferences);
setup();

// Now that the component is validated, we can call validate on the scripted lookup service
final LookupService<?> instance = lookupService.get();
final Collection<ValidationResult> currentValidationResults = validationResults.get();

// if there was existing validation errors and the processor loaded successfully
if (currentValidationResults.isEmpty() && instance != null) {
try {
// defer to the underlying controller service for validation, without the
// lookup service's properties
final Set<PropertyDescriptor> innerPropertyDescriptor = new HashSet<>(scriptingComponentHelper.getDescriptors());

ValidationContext innerValidationContext = new FilteredPropertiesValidationContextAdapter(context, innerPropertyDescriptor);
final Collection<ValidationResult> instanceResults = instance.validate(innerValidationContext);

if (instanceResults != null && instanceResults.size() > 0) {
// return the validation results from the underlying instance
return instanceResults;
}
} catch (final Exception e) {
final ComponentLog logger = getLogger();
final String message = "Unable to validate the scripted LookupService: " + e;
logger.error(message, e);

// return a new validation message
final Collection<ValidationResult> results = new HashSet<>();
results.add(new ValidationResult.Builder()
.subject("Validation")
.valid(false)
.explanation("An error occurred calling validate in the configured scripted LookupService.")
.input(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).getValue())
.build());
return results;
}
}

return currentValidationResults;

}

@Override
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
Expand Down Expand Up @@ -226,7 +308,8 @@ public void onDisabled(final ConfigurationContext context) {
}
}
} else {
throw new ScriptException("No LookupService was defined by the script.");
// This might be due to an error during compilation, log it rather than throwing an exception
getLogger().warn("No LookupService was defined by the script.");
}
} catch (ScriptException se) {
throw new ProcessException("Error executing onDisabled(context) method", se);
Expand All @@ -235,6 +318,10 @@ public void onDisabled(final ConfigurationContext context) {
} else {
throw new ProcessException("Error creating ScriptRunner");
}

scriptingComponentHelper.stop();
lookupService.set(null);
scriptRunner = null;
}

@Override
Expand Down Expand Up @@ -262,7 +349,7 @@ protected boolean reloadScript(final String scriptBody) {
final Collection<ValidationResult> results = new HashSet<>();

try {
// Create a single script engine, the Processor object is reused by each task
// Create a single script engine, the LookupService object is reused by each task
if (scriptRunner == null) {
scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public void setupScriptRunners(final boolean newQ, final int numberOfScriptEngin
}

// Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs
final String[] locations = modules.asLocations().toArray(new String[0]);
final String[] locations = (modules == null) ? new String[0] : modules.asLocations().toArray(new String[0]);
final URL[] additionalClasspathURLs = ScriptRunnerFactory.getInstance().getModuleURLsForClasspath(scriptEngineName, locations, log);


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,10 @@ class TestScriptedLookupService {
runner.addControllerService("lookupService", scriptedLookupService)
runner.setProperty(scriptedLookupService, "Script Engine", "Groovy")
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_BODY, (String) null)
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_FILE, ALTERNATE_TARGET_PATH.toString())
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_FILE, TARGET_PATH.toString())
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.MODULES, (String) null)
// This call to setup should fail loading the script but should mark that the script should be reloaded
scriptedLookupService.setup()
// This prevents the (lookupService == null) check from passing, in order to force the reload from the
// scriptNeedsReload variable specifically
scriptedLookupService.lookupService.set(new MockScriptedLookupService())

runner.enableControllerService(scriptedLookupService)

MockFlowFile mockFlowFile = new MockFlowFile(1L)
InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)

Optional opt = scriptedLookupService.lookup(['key':'Hello'])
assertTrue(opt.present)
assertEquals('Hi', opt.get())
Expand All @@ -133,6 +124,20 @@ class TestScriptedLookupService {
assertEquals('there', opt.get())
opt = scriptedLookupService.lookup(['key':'Not There'])
assertFalse(opt.present)

// Disable and load different script
runner.disableControllerService(scriptedLookupService)
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_FILE, ALTERNATE_TARGET_PATH.toString())
runner.enableControllerService(scriptedLookupService)

opt = scriptedLookupService.lookup(['key':'Hello'])
assertTrue(opt.present)
assertEquals('Goodbye', opt.get())
opt = scriptedLookupService.lookup(['key':'World'])
assertTrue(opt.present)
assertEquals('Stranger', opt.get())
opt = scriptedLookupService.lookup(['key':'Not There'])
assertFalse(opt.present)
}

class MockScriptedLookupService extends ScriptedLookupService implements AccessibleScriptingComponentHelper {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import org.apache.nifi.reporting.InitializationException
class SimpleGroovyLookupService implements StringLookupService {

def lookupTable = [
'Hello': 'Hi',
'World': 'there'
'Hello': 'Goodbye',
'World': 'Stranger'
]


Expand Down