Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose jobVersion, jobJarUrl as part of RuntimeTask interface #689

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class ExecuteStageRequest implements Serializable {
@Nullable
private final String nameOfJobProviderClass;
private final String user;
private final String jobVersion;

@JsonCreator
@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -102,7 +103,8 @@ public ExecuteStageRequest(
@JsonProperty("minRuntimeSecs") long minRuntimeSecs,
@JsonProperty("workerPorts") WorkerPorts workerPorts,
@JsonProperty("nameOfJobProviderClass") Optional<String> nameOfJobProviderClass,
@JsonProperty("user") String user) {
@JsonProperty("user") String user,
@JsonProperty("jobVersion") String jobVersion) {
this.jobName = jobName;
this.jobId = jobId;
this.workerIndex = workerIndex;
Expand All @@ -128,6 +130,7 @@ public ExecuteStageRequest(
this.subscriptionTimeoutSecs = subscriptionTimeoutSecs;
this.minRuntimeSecs = minRuntimeSecs;
this.workerPorts = workerPorts;
this.jobVersion = jobVersion;
}

public boolean getHasJobMaster() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class JobMetadata {

private final String jobId;
private final URL jobJarUrl;
private final String jobVersion;
private final int totalStages;
private final String user;
private final SchedulingInfo schedulingInfo;
Expand All @@ -37,6 +38,7 @@ public class JobMetadata {

public JobMetadata(final String jobId,
final URL jobJarUrl,
final String jobVersion,
final int totalStages,
final String user,
final SchedulingInfo schedulingInfo,
Expand All @@ -46,6 +48,7 @@ public JobMetadata(final String jobId,
final long minRuntimeSecs) {
this.jobId = jobId;
this.jobJarUrl = jobJarUrl;
this.jobVersion = jobVersion;
this.totalStages = totalStages;
this.user = user;
this.schedulingInfo = schedulingInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ public void setup() throws Exception {
1L,
new WorkerPorts(2, 3, 4, 5, 6),
java.util.Optional.of("className"),
"user1");
"user1",
"111");
example2 = new ExecuteStageRequest("jobName", "jobId-0", 0, 1,
new URL("http://datamesh/whatever"),
1, 1,
Expand All @@ -70,7 +71,8 @@ public void setup() throws Exception {
1L,
new WorkerPorts(2, 3, 4, 5, 6),
java.util.Optional.empty(),
"user1");
"user1",
"111");
}

@Test
Expand Down Expand Up @@ -148,6 +150,7 @@ public void testIfExecuteStageRequestIsSerializableAndDeserializableFromJackson(
" },\n" +
" \"nameOfJobProviderClass\": \"className\",\n" +
" \"user\": \"user1\",\n" +
" \"jobVersion\": \"111\",\n" +
" \"hasJobMaster\": false,\n" +
" \"jobId\": \"jobId-0\",\n" +
" \"workerId\":\n" +
Expand Down Expand Up @@ -231,6 +234,7 @@ public void testIfExecuteStageRequestIsSerializableAndDeserializableFromJacksonW
" },\n" +
" \"nameOfJobProviderClass\": null,\n" +
" \"user\": \"user1\",\n" +
" \"jobVersion\": \"111\",\n" +
" \"hasJobMaster\": false,\n" +
" \"jobId\": \"jobId-0\",\n" +
" \"workerId\":\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void testGetJobArtifact() throws Exception {
Lists.newArrayList(),
Lists.newArrayList()).build();
JobMetadata jobMetadata = new JobMetadata(
"testId", new URL("http://artifact.zip"),1,"testUser",schedulingInfo, Lists.newArrayList(),0,10, 0);
"testId", new URL("http://artifact.zip"), "111", 1,"testUser",schedulingInfo, Lists.newArrayList(),0,10, 0);
assertEquals(jobMetadata.getJobArtifact(), ArtifactID.of("artifact.zip"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1630,6 +1630,7 @@ private ScheduleRequest createSchedulingRequest(
JobMetadata jobMetadata = new JobMetadata(
mantisJobMetaData.getJobId().getId(),
mantisJobMetaData.getJobJarUrl(),
mantisJobMetaData.getJobDefinition().getVersion(),
mantisJobMetaData.getTotalStages(),
mantisJobMetaData.getUser(),
mantisJobMetaData.getSchedulingInfo(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public ExecuteStageRequest of(
scheduleRequest.getJobMetadata().getMinRuntimeSecs() - (System.currentTimeMillis() - scheduleRequest.getJobMetadata().getMinRuntimeSecs()),
matchedTaskExecutorInfo.getWorkerPorts(),
Optional.empty(),
scheduleRequest.getJobMetadata().getUser());
scheduleRequest.getJobMetadata().getUser(),
scheduleRequest.getJobMetadata().getJobVersion());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static ScheduleRequest createFakeScheduleRequest(final WorkerId workerId,
stageNum,
new JobMetadata(mantisJobMetadata.getJobId().getId(),
mantisJobMetadata.getJobJarUrl(),
mantisJobMetadata.getJobDefinition().getVersion(),
mantisJobMetadata.getTotalStages(),
mantisJobMetadata.getUser(),
mantisJobMetadata.getSchedulingInfo(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ public void testJobSubmitPerpetual() {
verify(schedulerMock,times(1)).scheduleWorkers(any());

JobMetadata jobMetadata = new JobMetadata(jobId, new URL("http://myart" +
""),1,"njoshi",schedInfo,Lists.newArrayList(),0,10, 0);
""),"111", 1,"njoshi",schedInfo,Lists.newArrayList(),0,10, 0);
ScheduleRequest scheduleRequest = new ScheduleRequest(
workerId, 1, jobMetadata,MantisJobDurationType.Perpetual, SchedulingConstraints.of(machineDefinition),0);
BatchScheduleRequest expectedRequest = new BatchScheduleRequest(Collections.singletonList(scheduleRequest));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ void initialize(
UserCodeClassLoader userCodeClassLoader);

String getWorkerId();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer we do not add specific metrics/metadata into these entry-interfaces. can you elaborate on why it's on the runtimeTask for metrics instead of using regular metrics collector tagging? I feel like there are existing metrics path to do this without adding into this interface.

Copy link
Collaborator Author

@sarahwada-stripe sarahwada-stripe Jul 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Andyz26 thanks for the review!

can you elaborate on why it's on the runtimeTask for metrics instead of using regular metrics collector tagging?

The job metadata is only available as part of the job submission, not the actual running job itself. From my understanding, we need to hook into the task/job submission to get information like version. Right now, we have written our own AgentMain class, and we can hook into the TaskExecutor onTaskStarting to add custom job labels, like the scaffolding that exists here:

.

Given the context above, do you have a recommendation for a better place to make this change? Happy to move things around, it's possible I've missed a better metrics path when going through the code.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo since agent to inner task is not always 1 to 1 it would be more consistent to construct such metrics within the worker runtime where you have access to things like MetricsRegistry (I don't know how your current metrics integration is done into with this one).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks for the feedback! are you opposed to adding jobVersion to the ExecuteStageRequest? I think we'll need it to capture the jobVersion on the worker

}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ public void testTaskExecutorEndToEndWithASingleStageJobByLoadingFromClassLoader(
1L,
new WorkerPorts(2, 3, 4, 5, 6),
Optional.of(SineFunctionJobProvider.class.getName()),
"user")), Time.seconds(1));
"user",
"111")), Time.seconds(1));
wait.get();
Assert.assertTrue(startedSignal.await(5, TimeUnit.SECONDS));
Subscription subscription = HttpSources.source(HttpClientFactories.sseClientFactory(),
Expand Down
Loading