Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ikasan 2381 windows scheduler agent failure terminates command execution jobs #1311

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
* @author Ikasan Development Team
*/
public class JobStartingBroker implements Broker<EnrichedContextualisedScheduledProcessEvent, EnrichedContextualisedScheduledProcessEvent>,
ConfiguredResource<JobStartingBrokerConfiguration>
ConfiguredResource<JobStartingBrokerConfiguration>
{
/** logger */
private static final Logger LOGGER = LoggerFactory.getLogger(JobStartingBroker.class);
Expand All @@ -77,7 +77,7 @@ public class JobStartingBroker implements Broker<EnrichedContextualisedScheduled

private String configuredResourceId;
private JobStartingBrokerConfiguration configuration;

private final DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME;
private final SchedulerPersistenceService schedulerPersistenceService;
public JobStartingBroker(SchedulerPersistenceService schedulerPersistenceService) {
Expand All @@ -87,6 +87,7 @@ public JobStartingBroker(SchedulerPersistenceService schedulerPersistenceService
@Override
public EnrichedContextualisedScheduledProcessEvent invoke(EnrichedContextualisedScheduledProcessEvent scheduledProcessEvent) throws EndpointException
{
LOGGER.info("Start JobStartingBroker with event " + scheduledProcessEvent);
scheduledProcessEvent.setJobStarting(true);
scheduledProcessEvent.setOutcome(Outcome.EXECUTION_INVOKED);

Expand All @@ -99,8 +100,7 @@ public EnrichedContextualisedScheduledProcessEvent invoke(EnrichedContextualised
// week that is defined.
if(scheduledProcessEvent.getInternalEventDrivenJob().getDaysOfWeekToRun() != null
&& !scheduledProcessEvent.getInternalEventDrivenJob().getDaysOfWeekToRun().isEmpty()
&& !scheduledProcessEvent.getInternalEventDrivenJob().getDaysOfWeekToRun()
.contains(Calendar.getInstance().get(Calendar.DAY_OF_WEEK))) {
&& !scheduledProcessEvent.getInternalEventDrivenJob().getDaysOfWeekToRun().contains(Calendar.getInstance().get(Calendar.DAY_OF_WEEK))) {
return scheduledProcessEvent;
}

Expand All @@ -109,25 +109,27 @@ public EnrichedContextualisedScheduledProcessEvent invoke(EnrichedContextualised
schedulerPersistenceService,
new ProcessBuilder(),
StringUtils.split(scheduledProcessEvent.getInternalEventDrivenJob().getExecutionEnvironmentProperties(), "|"),
scheduledProcessEvent.getProcessIdentity()
scheduledProcessEvent.generateProcessIdentity()
);
scheduledProcessEvent.setDetachableProcess(detachableProcessBuilder.getDetachableProcess());
detachableProcessBuilder.command(scheduledProcessEvent.getInternalEventDrivenJob().getCommandLine());

File outputLog;
File errorLog;
long fireTime;

if( ! scheduledProcessEvent.getDetachableProcess().isDetached()) {
// This is a new process, setup appropriately
if(scheduledProcessEvent.getInternalEventDrivenJob().getWorkingDirectory() != null
&& scheduledProcessEvent.getInternalEventDrivenJob().getWorkingDirectory().length() > 0) {
&& !scheduledProcessEvent.getInternalEventDrivenJob().getWorkingDirectory().isEmpty()) {
File workingDirectory = new File(scheduledProcessEvent.getInternalEventDrivenJob().getWorkingDirectory());
detachableProcessBuilder.directory(workingDirectory);
}
outputLog = new File(scheduledProcessEvent.getResultOutput());
FileUtil.createMissingParentDirectories(outputLog);
detachableProcessBuilder.setInitialResultOutput(outputLog.getAbsolutePath());
String formattedDate = formatter.format(LocalDateTime.now());

// Output log
if(outputLog.exists()) {
String newFileName = scheduledProcessEvent.getResultOutput() + "." + formattedDate;
if (!outputLog.renameTo(new File(newFileName))) {
Expand All @@ -136,6 +138,7 @@ public EnrichedContextualisedScheduledProcessEvent invoke(EnrichedContextualised
}
detachableProcessBuilder.redirectOutput(outputLog);

// Error log
errorLog = new File(scheduledProcessEvent.getResultError());
FileUtil.createMissingParentDirectories(errorLog);
detachableProcessBuilder.setInitialErrorOutput(errorLog.getAbsolutePath());
Expand All @@ -147,6 +150,11 @@ public EnrichedContextualisedScheduledProcessEvent invoke(EnrichedContextualised
}
detachableProcessBuilder.redirectError(errorLog);

// fire time
fireTime = scheduledProcessEvent.getFireTime();
detachableProcessBuilder.setInitialFireTime(fireTime);
detachableProcessBuilder.command(scheduledProcessEvent.getInternalEventDrivenJob().getCommandLine());

// Some environments like cmd.exe will not persist an environment value if it is empty. This will identify
// based on a list of environments provided in the broker configuration if a space should be added to the
// context parameter value if it is empty
Expand Down Expand Up @@ -177,21 +185,21 @@ public EnrichedContextualisedScheduledProcessEvent invoke(EnrichedContextualised
}

} else {
// Once detached, configuration on the process must not change, most will have been deserialized already.
// Once detached, the location of output/error/firetime MUST come from the serialized process.
outputLog = new File(detachableProcessBuilder.getInitialResultOutput());
errorLog = new File(detachableProcessBuilder.getInitialErrorOutput());
fireTime = detachableProcessBuilder.getInitialFireTime();
}

// These came from the event but may have been updated by a deserialized detached process, or fully qualified name.
scheduledProcessEvent.setResultOutput(outputLog.getAbsolutePath());
scheduledProcessEvent.setResultError(errorLog.getAbsolutePath());
scheduledProcessEvent.setFireTime(fireTime);

try {
// Start the process and enrich the payload.
StringBuffer processStartString = new StringBuffer("\nExecuting Job -> Context Name[")
.append(scheduledProcessEvent.getContextName())
.append("] Job Name[")
.append(scheduledProcessEvent.getInternalEventDrivenJob().getJobName())
.append("] Job Name[").append(scheduledProcessEvent.getInternalEventDrivenJob().getJobName())
.append("]\n\n");

processStartString.append("Job Parameters -> ").append("\n");
Expand All @@ -201,27 +209,21 @@ public EnrichedContextualisedScheduledProcessEvent invoke(EnrichedContextualised
}
else {
scheduledProcessEvent.getContextParameters()
.forEach(contextParameter -> processStartString.append("Name[")
.append(contextParameter.getName())
.append("] Value[")
.append(contextParameter.getValue())
.append("]")
.append("\n"));
.forEach(contextParameter -> processStartString
.append("Name[").append(contextParameter.getName())
.append("] Value[").append(contextParameter.getValue())
.append("]").append("\n"));
}

processStartString.append("Name[")
.append(LOG_FILE_PATH)
.append("] Value[")
.append(scheduledProcessEvent.getResultOutput())
.append("]")
.append("\n");
processStartString
.append("Name[").append(LOG_FILE_PATH)
.append("] Value[").append(scheduledProcessEvent.getResultOutput())
.append("]").append("\n");

processStartString.append("Name[")
.append(ERROR_LOG_FILE_PATH)
.append("] Value[")
.append(scheduledProcessEvent.getResultError())
.append("]")
.append("\n");
processStartString
.append("Name[").append(ERROR_LOG_FILE_PATH)
.append("] Value[").append(scheduledProcessEvent.getResultError())
.append("]").append("\n");

StringBuffer commandString = new StringBuffer("Process Command -> ").append("\n");
detachableProcessBuilder.command().stream().forEach(command -> commandString.append(command).append("\n\n"));
Expand All @@ -242,7 +244,6 @@ public EnrichedContextualisedScheduledProcessEvent invoke(EnrichedContextualised
catch (IOException e) {
throw new EndpointException(e);
}

return scheduledProcessEvent;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,23 @@ public void setDetachableProcess(DetachableProcess detachableProcess) {
this.detachableProcess = detachableProcess;
}

@JsonIgnore
/*
* The identity is used during persistence of the process details to assist restart after agent restart
* This identity is used to recover the persisted process's output/return code if the agent is restarted after
* a crash or an orderly shutdown occurred when a command execution job was in flight.
* With recurring job, the context instance ID does not change for each repetition but, the nature of the
* queueing mechanism means they each recurrence waits for the previous to finish, so they can't overlap,
* so this key is unique enough.
*/
public String getProcessIdentity() {
public String generateProcessIdentity() {
String despacedJobName = getJobName() == null ? getJobName() : getJobName().replaceAll(" ", "_");
return getContextInstanceId() + "-" + despacedJobName;
}

@JsonIgnore
public void setDetailsFromProcess() {
if (detachableProcess.isDetachedAlreadyFinished()) {
setUser("Detatched process");
setCommandLine("Detatched process");
setUser("Detached process");
setCommandLine("Detached process");
} else {
ProcessHandle.Info info = detachableProcess.getInfo();
if(info != null && info.user().isPresent()) {
Expand All @@ -63,7 +66,6 @@ public void setContextParameters(List<ContextParameterInstance> contextParameter
this.contextParameters = contextParameters;
}


/**
* Set the context parameters.
*
Expand All @@ -77,8 +79,21 @@ public List<ContextParameterInstance> getContextParameters() {
@Override
public String toString() {
return "EnrichedContextualisedScheduledProcessEvent{" +
// this
", detachableProcess=" + detachableProcess +
", contextParameters=" + contextParameters +

// ContextualisedScheduledProcessEventImpl
", contextId=" + getContextName() +
", childContextIds=" + getChildContextNames() +
", contextInstanceId=" + getContextInstanceId() +
", skipped=" + isSkipped() +
// getInternalEventDrivenJob() prevents further logging so is suppressed.
//", internalEventDrivenJob=" + getInternalEventDrivenJob() +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davidhilton68 please remove commented out line if no longer necessary

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, updates to remove the commented out code but kept the comment to stop anyone falling into same issue.

", raisedDueToFailureResubmission=" + isRaisedDueToFailureResubmission() +
", catalystEvent=" + getCatalystEvent() +

// ScheduledProcessEventImpl
", id=" + id +
", agentName='" + agentName + '\'' +
", agentHostname='" + agentHostname + '\'' +
Expand Down
Loading