Skip to content

Commit

Permalink
[GOBBLIN-1910] part one of the changes that dag refactoring will requ…
Browse files Browse the repository at this point in the history
…ire (apache#3853)

* part one of the changes that dag refactoring will require
* address review comments
---------
Co-authored-by: Meeth Gala <mgala@linkedin.com>
Co-authored-by: Arjun Singh Bora <abora@abora-mn1.linkedin.biz>
Co-authored-by: Arjun Singh Bora <abora@abora-mn3.linkedin.biz>
  • Loading branch information
arjun4084346 authored Jan 18, 2024
1 parent f95b36f commit 318c661
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,26 @@ public Iterator<JobStatus> getLatestJobStatusByFlowNameAndGroup(String flowName,
* @return deserialize {@link State} into a {@link JobStatus}.
*/
protected JobStatus getJobStatus(State jobState) {
JobStatus.JobStatusBuilder jobStatusBuilder = createJobStatusBuilderFromState(jobState);

String contextId = TroubleshooterUtils.getContextIdForJob(jobState.getProperties());

Supplier<List<Issue>> jobIssues = Suppliers.memoize(() -> {
List<Issue> issues;
try {
issues = issueRepository.getAll(contextId);
} catch (TroubleshooterException e) {
log.warn("Cannot retrieve job issues", e);
issues = Collections.emptyList();
}
return issues;
});

jobStatusBuilder.issues(jobIssues);
return jobStatusBuilder.build();
}

public static JobStatus.JobStatusBuilder createJobStatusBuilderFromState(State jobState) {
String flowGroup = getFlowGroup(jobState);
String flowName = getFlowName(jobState);
long flowExecutionId = getFlowExecutionId(jobState);
Expand All @@ -125,48 +145,35 @@ protected JobStatus getJobStatus(State jobState) {
int progressPercentage = jobState.getPropAsInt(TimingEvent.JOB_COMPLETION_PERCENTAGE, 0);
long lastProgressEventTime = jobState.getPropAsLong(TimingEvent.JOB_LAST_PROGRESS_EVENT_TIME, 0);

String contextId = TroubleshooterUtils.getContextIdForJob(jobState.getProperties());

Supplier<List<Issue>> jobIssues = Suppliers.memoize(() -> {
List<Issue> issues;
try {
issues = issueRepository.getAll(contextId);
} catch (TroubleshooterException e) {
log.warn("Cannot retrieve job issues", e);
issues = Collections.emptyList();
}
return issues;
});

return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).
jobName(jobName).jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).
lowWatermark(lowWatermark).highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime).
message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts).currentGeneration(currentGeneration).
shouldRetry(shouldRetry).progressPercentage(progressPercentage).lastProgressEventTime(lastProgressEventTime).
issues(jobIssues).build();
return JobStatus.builder().flowName(flowName).flowGroup(flowGroup).flowExecutionId(flowExecutionId).jobName(jobName)
.jobGroup(jobGroup).jobTag(jobTag).jobExecutionId(jobExecutionId).eventName(eventName).lowWatermark(lowWatermark)
.highWatermark(highWatermark).orchestratedTime(orchestratedTime).startTime(startTime).endTime(endTime)
.message(message).processedCount(processedCount).maxAttempts(maxAttempts).currentAttempts(currentAttempts)
.currentGeneration(currentGeneration).shouldRetry(shouldRetry).progressPercentage(progressPercentage)
.lastProgressEventTime(lastProgressEventTime);
}

protected final String getFlowGroup(State jobState) {
protected static final String getFlowGroup(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD);
}

protected final String getFlowName(State jobState) {
protected static final String getFlowName(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD);
}

protected final long getFlowExecutionId(State jobState) {
protected static final long getFlowExecutionId(State jobState) {
return Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.FLOW_EXECUTION_ID_FIELD));
}

protected final String getJobGroup(State jobState) {
protected static final String getJobGroup(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.JOB_GROUP_FIELD);
}

protected final String getJobName(State jobState) {
protected static final String getJobName(State jobState) {
return jobState.getProp(TimingEvent.FlowEventConstants.JOB_NAME_FIELD);
}

protected final long getJobExecutionId(State jobState) {
protected static final long getJobExecutionId(State jobState) {
return Long.parseLong(jobState.getProp(TimingEvent.FlowEventConstants.JOB_EXECUTION_ID_FIELD, "0"));
}

Expand All @@ -178,7 +185,9 @@ protected List<FlowStatus> asFlowStatuses(List<FlowExecutionJobStateGrouping> fl
return flowExecutionGroupings.stream().map(exec -> {
List<JobStatus> jobStatuses = ImmutableList.copyOf(asJobStatuses(exec.getJobStates().stream().sorted(
// rationalized order, to facilitate test assertions
Comparator.comparing(this::getJobGroup).thenComparing(this::getJobName).thenComparing(this::getJobExecutionId)
Comparator.comparing(JobStatusRetriever::getJobGroup)
.thenComparing(JobStatusRetriever::getJobName)
.thenComparing(JobStatusRetriever::getJobExecutionId)
).collect(Collectors.toList())));
return new FlowStatus(exec.getFlowName(), exec.getFlowGroup(), exec.getFlowExecutionId(), jobStatuses.iterator(),
getFlowStatusFromJobStatuses(jobStatuses.iterator()));
Expand All @@ -196,10 +205,8 @@ protected static class FlowExecutionJobStateGrouping {

protected List<FlowExecutionJobStateGrouping> groupByFlowExecutionAndRetainLatest(
String flowGroup, List<State> jobStatusStates, int maxCountPerFlowName) {
Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName =
jobStatusStates.stream().collect(Collectors.groupingBy(
this::getFlowName,
Collectors.groupingBy(this::getFlowExecutionId)));
Map<String, Map<Long, List<State>>> statesByFlowExecutionIdByName = jobStatusStates.stream().collect(
Collectors.groupingBy(JobStatusRetriever::getFlowName, Collectors.groupingBy(JobStatusRetriever::getFlowExecutionId)));

return statesByFlowExecutionIdByName.entrySet().stream().sorted(Map.Entry.comparingByKey()).flatMap(flowNameEntry -> {
String flowName = flowNameEntry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.gobblin.service.modules.flowgraph;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -26,15 +28,11 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import lombok.Getter;
import lombok.Setter;

import org.apache.gobblin.annotation.Alpha;


/**
* An implementation of Dag. Assumes that nodes have unique values. Nodes with duplicate values will produce
* unpredictable behavior.
Expand Down Expand Up @@ -261,6 +259,7 @@ public DagNode(T value) {
this.value = value;
}


public void addParentNode(DagNode<T> node) {
if (parentNodes == null) {
parentNodes = Lists.newArrayList(node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ DagStateStore createDagStateStore(Config config, Map<URI, TopologySpec> topology
}

// Initializes and returns an array of Queue of size numThreads
private static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
static LinkedBlockingDeque<?>[] initializeDagQueue(int numThreads) {
LinkedBlockingDeque<?>[] queue = new LinkedBlockingDeque[numThreads];

for (int i=0; i< numThreads; i++) {
Expand Down Expand Up @@ -324,16 +324,7 @@ public synchronized void addDag(Dag<JobExecutionPlan> dag, boolean persist, bool
throw new IOException("Could not add dag" + dagId + "to queue");
}
if (setStatus) {
submitEventsAndSetStatus(dag);
}
}

private void submitEventsAndSetStatus(Dag<JobExecutionPlan> dag) {
for (DagNode<JobExecutionPlan> dagNode : dag.getNodes()) {
JobExecutionPlan jobExecutionPlan = DagManagerUtils.getJobExecutionPlan(dagNode);
Map<String, String> jobMetadata = TimingEventUtils.getJobMetadata(Maps.newHashMap(), jobExecutionPlan);
new TimingEvent(eventSubmitter, TimingEvent.LauncherTimings.JOB_PENDING).stop(jobMetadata);
jobExecutionPlan.setExecutionStatus(PENDING);
DagManagerUtils.submitPendingExecStatus(dag, this.eventSubmitter);
}
}

Expand Down Expand Up @@ -642,15 +633,18 @@ private void beginResumingDag(DagId dagIdToResume) throws IOException {
*/
private void finishResumingDags() throws IOException {
for (Map.Entry<String, Dag<JobExecutionPlan>> dag : this.resumingDags.entrySet()) {
JobStatus flowStatus = pollFlowStatus(dag.getValue());
if (flowStatus == null || !flowStatus.getEventName().equals(PENDING_RESUME.name())) {
java.util.Optional<JobStatus> flowStatus = DagManagerUtils.pollFlowStatus(dag.getValue(), this.jobStatusRetriever, this.jobStatusPolledTimer);
if (!flowStatus.filter(fs -> fs.getEventName().equals(PENDING_RESUME.name())).isPresent()) {
continue;
}

boolean dagReady = true;
for (DagNode<JobExecutionPlan> node : dag.getValue().getNodes()) {
JobStatus jobStatus = pollJobStatus(node);
if (jobStatus == null || jobStatus.getEventName().equals(FAILED.name()) || jobStatus.getEventName().equals(CANCELLED.name())) {
java.util.Optional<JobStatus> jobStatus = DagManagerUtils.pollJobStatus(node, this.jobStatusRetriever, this.jobStatusPolledTimer);
if (jobStatus.filter(js -> {
String jobName = js.getEventName();
return jobName.equals(FAILED.name()) || jobName.equals(CANCELLED.name());
}).isPresent()) {
dagReady = false;
break;
}
Expand Down Expand Up @@ -772,15 +766,15 @@ private void initialize(Dag<JobExecutionPlan> dag)
/**
* Proceed the execution of each dag node based on job status.
*/
private void pollAndAdvanceDag() throws IOException, ExecutionException, InterruptedException {
private void pollAndAdvanceDag() {
Map<String, Set<DagNode<JobExecutionPlan>>> nextSubmitted = Maps.newHashMap();
List<DagNode<JobExecutionPlan>> nodesToCleanUp = Lists.newArrayList();

for (DagNode<JobExecutionPlan> node : this.jobToDag.keySet()) {
try {
boolean slaKilled = slaKillIfNeeded(node);

JobStatus jobStatus = pollJobStatus(node);
java.util.Optional<JobStatus> jobStatus = DagManagerUtils.pollJobStatus(node, this.jobStatusRetriever, this.jobStatusPolledTimer);

boolean killOrphanFlow = killJobIfOrphaned(node, jobStatus);

Expand Down Expand Up @@ -815,13 +809,13 @@ private void pollAndAdvanceDag() throws IOException, ExecutionException, Interru
break;
}

if (jobStatus != null && jobStatus.isShouldRetry()) {
if (jobStatus.filter(JobStatus::isShouldRetry).isPresent()) {
log.info("Retrying job: {}, current attempts: {}, max attempts: {}", DagManagerUtils.getFullyQualifiedJobName(node),
jobStatus.getCurrentAttempts(), jobStatus.getMaxAttempts());
jobStatus.get().getCurrentAttempts(), jobStatus.get().getMaxAttempts());
this.jobToDag.get(node).setFlowEvent(null);
submitJob(node);
}
} catch (Exception e) {
} catch (InterruptedException | IOException | ExecutionException e) {
// Error occurred while processing dag, continue processing other dags assigned to this thread
log.error(String.format("Exception caught in DagManager while processing dag %s due to ",
DagManagerUtils.getFullyQualifiedDagName(node)), e);
Expand Down Expand Up @@ -850,14 +844,14 @@ private void pollAndAdvanceDag() throws IOException, ExecutionException, Interru
* @return true if the total time that the job remains in the ORCHESTRATED state exceeds
* {@value ConfigurationKeys#GOBBLIN_JOB_START_SLA_TIME}.
*/
private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, JobStatus jobStatus)
private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, java.util.Optional<JobStatus> jobStatus)
throws ExecutionException, InterruptedException {
if (jobStatus == null) {
if (!jobStatus.isPresent()) {
return false;
}
ExecutionStatus executionStatus = valueOf(jobStatus.getEventName());
ExecutionStatus executionStatus = valueOf(jobStatus.get().getEventName());
long timeOutForJobStart = DagManagerUtils.getJobStartSla(node, this.defaultJobStartSlaTimeMillis);
long jobOrchestratedTime = jobStatus.getOrchestratedTime();
long jobOrchestratedTime = jobStatus.get().getOrchestratedTime();
if (executionStatus == ORCHESTRATED && System.currentTimeMillis() - jobOrchestratedTime > timeOutForJobStart) {
log.info("Job {} of flow {} exceeded the job start SLA of {} ms. Killing the job now...",
DagManagerUtils.getJobName(node),
Expand All @@ -875,15 +869,11 @@ private boolean killJobIfOrphaned(DagNode<JobExecutionPlan> node, JobStatus jobS
}
}

private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean killOrphanFlow, JobStatus jobStatus) {
private ExecutionStatus getJobExecutionStatus(boolean slaKilled, boolean killOrphanFlow, java.util.Optional<JobStatus> jobStatus) {
if (slaKilled || killOrphanFlow) {
return CANCELLED;
} else {
if (jobStatus == null) {
return PENDING;
} else {
return valueOf(jobStatus.getEventName());
}
return jobStatus.map(status -> valueOf(status.getEventName())).orElse(PENDING);
}
}

Expand Down Expand Up @@ -932,47 +922,7 @@ private boolean slaKillIfNeeded(DagNode<JobExecutionPlan> node) throws Execution
return false;
}

/**
* Retrieve the {@link JobStatus} from the {@link JobExecutionPlan}.
*/
private JobStatus pollJobStatus(DagNode<JobExecutionPlan> dagNode) {
Config jobConfig = dagNode.getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
String jobGroup = jobConfig.getString(ConfigurationKeys.JOB_GROUP_KEY);
String jobName = jobConfig.getString(ConfigurationKeys.JOB_NAME_KEY);

return pollStatus(flowGroup, flowName, flowExecutionId, jobGroup, jobName);
}

/**
* Retrieve the flow's {@link JobStatus} (i.e. job status with {@link JobStatusRetriever#NA_KEY} as job name/group) from a dag
*/
private JobStatus pollFlowStatus(Dag<JobExecutionPlan> dag) {
if (dag == null || dag.isEmpty()) {
return null;
}
Config jobConfig = dag.getNodes().get(0).getValue().getJobSpec().getConfig();
String flowGroup = jobConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY);
String flowName = jobConfig.getString(ConfigurationKeys.FLOW_NAME_KEY);
long flowExecutionId = jobConfig.getLong(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);

return pollStatus(flowGroup, flowName, flowExecutionId, JobStatusRetriever.NA_KEY, JobStatusRetriever.NA_KEY);
}

private JobStatus pollStatus(String flowGroup, String flowName, long flowExecutionId, String jobGroup, String jobName) {
long pollStartTime = System.nanoTime();
Iterator<JobStatus> jobStatusIterator =
this.jobStatusRetriever.getJobStatusesForFlowExecution(flowName, flowGroup, flowExecutionId, jobName, jobGroup);
Instrumented.updateTimer(this.jobStatusPolledTimer, System.nanoTime() - pollStartTime, TimeUnit.NANOSECONDS);

if (jobStatusIterator.hasNext()) {
return jobStatusIterator.next();
} else {
return null;
}
}

/**
* Submit next set of Dag nodes in the Dag identified by the provided dagId
Expand Down Expand Up @@ -1107,11 +1057,6 @@ private void addJobState(String dagId, DagNode<JobExecutionPlan> dagNode) {
}
}

private boolean hasRunningJobs(String dagId) {
List<DagNode<JobExecutionPlan>> dagNodes = this.dagToJobs.get(dagId);
return dagNodes != null && !dagNodes.isEmpty();
}

/**
* Perform clean up. Remove a dag from the dagstore if the dag is complete and update internal state.
*/
Expand All @@ -1136,7 +1081,7 @@ private void cleanUp() {
deleteJobState(dagId, dagNode);
}
}
if (!hasRunningJobs(dagId)) {
if (!DagManagerUtils.hasRunningJobs(dagId, this.dagToJobs)) {
// Collect all the dagIds that are finished
this.dagIdstoClean.add(dagId);
if (dag.getFlowEvent() == null) {
Expand All @@ -1155,8 +1100,8 @@ private void cleanUp() {
for (Iterator<String> dagIdIterator = this.dagIdstoClean.iterator(); dagIdIterator.hasNext();) {
String dagId = dagIdIterator.next();
Dag<JobExecutionPlan> dag = this.dags.get(dagId);
JobStatus flowStatus = pollFlowStatus(dag);
if (flowStatus != null && FlowStatusGenerator.FINISHED_STATUSES.contains(flowStatus.getEventName())) {
java.util.Optional<JobStatus> flowStatus = DagManagerUtils.pollFlowStatus(dag, this.jobStatusRetriever, this.jobStatusPolledTimer);
if (flowStatus.filter(fs -> FlowStatusGenerator.FINISHED_STATUSES.contains(fs.getEventName())).isPresent()) {
FlowId flowId = DagManagerUtils.getFlowId(dag);
switch(dag.getFlowEvent()) {
case TimingEvent.FlowTimings.FLOW_SUCCEEDED:
Expand Down
Loading

0 comments on commit 318c661

Please sign in to comment.