Skip to content

Commit

Permalink
Fix jobs endpoints in all api servers to return bytes & records (#11529)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonsSpaghetti committed Mar 6, 2024
1 parent 4c820b6 commit 89139e2
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,7 @@ object JobResponseMapper {
* @return JobResponse Response object which contains job id, status, and job type
*/
fun from(jobInfoRead: JobInfoRead): JobResponse {
val jobResponse: JobResponse = jobResponseFromJobReadMinusSyncedData(jobInfoRead.job)
if (jobInfoRead.attempts.size > 0) {
val lastAttempt = jobInfoRead.attempts[jobInfoRead.attempts.size - 1]
jobResponse.setBytesSynced(lastAttempt.attempt.totalStats?.bytesCommitted)
jobResponse.setRowsSynced(lastAttempt.attempt.totalStats?.recordsCommitted)
}
return jobResponse
return jobResponseFromJobReadMinusSyncedData(jobInfoRead.job)
}

/**
Expand All @@ -50,41 +44,37 @@ object JobResponseMapper {
* @return JobResponse Response object which contains job id, status, and job type
*/
fun from(jobWithAttemptsRead: JobWithAttemptsRead): JobResponse {
val jobResponse: JobResponse = jobResponseFromJobReadMinusSyncedData(jobWithAttemptsRead.job)
if (jobWithAttemptsRead.attempts != null && jobWithAttemptsRead.attempts!!.size > 0) {
val lastAttempt = jobWithAttemptsRead.attempts!![jobWithAttemptsRead.attempts!!.size - 1]

jobResponse.setBytesSynced(lastAttempt.totalStats?.bytesCommitted)
jobResponse.setRowsSynced(lastAttempt.totalStats?.recordsCommitted)
}
return jobResponse
return jobResponseFromJobReadMinusSyncedData(jobWithAttemptsRead.job)
}

/**
* Converts a JobRead object from the config api to a JobResponse object.
*/
private fun jobResponseFromJobReadMinusSyncedData(jobRead: JobRead?): JobResponse {
val jobResponse = JobResponse()
jobResponse.setJobId(jobRead!!.id)
jobResponse.setStatus(JobStatusEnum.fromValue(jobRead.status.toString()))
jobResponse.setConnectionId(UUID.fromString(jobRead.configId))
jobResponse.jobId = jobRead!!.id
jobResponse.status = JobStatusEnum.fromValue(jobRead.status.toString())
jobResponse.connectionId = UUID.fromString(jobRead.configId)
when (jobRead.configType) {
JobConfigType.SYNC -> jobResponse.setJobType(JobTypeEnum.SYNC)
JobConfigType.RESET_CONNECTION -> jobResponse.setJobType(JobTypeEnum.RESET)
JobConfigType.SYNC -> jobResponse.jobType = JobTypeEnum.SYNC
JobConfigType.RESET_CONNECTION -> jobResponse.jobType = JobTypeEnum.RESET
else -> {
assert(ALLOWED_CONFIG_TYPES.contains(jobRead.configType))
}
}
// set to string for now since the jax-rs response entity turns offsetdatetime into epoch seconds
jobResponse.setStartTime(OffsetDateTime.ofInstant(Instant.ofEpochSecond(jobRead.createdAt), UTC).toString())
jobResponse.startTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(jobRead.createdAt), UTC).toString()
if (TERMINAL_JOB_STATUS.contains(jobRead.status)) {
jobResponse.setLastUpdatedAt(OffsetDateTime.ofInstant(Instant.ofEpochSecond(jobRead.updatedAt), UTC).toString())
jobResponse.lastUpdatedAt = OffsetDateTime.ofInstant(Instant.ofEpochSecond(jobRead.updatedAt), UTC).toString()
}

// duration is ISO_8601 formatted https://en.wikipedia.org/wiki/ISO_8601#Durations
if (jobRead.status != JobStatus.PENDING) {
jobResponse.setDuration(Duration.ofSeconds(jobRead.updatedAt - jobRead.createdAt).toString())
jobResponse.duration = Duration.ofSeconds(jobRead.updatedAt - jobRead.createdAt).toString()
}

jobResponse.bytesSynced = jobRead.aggregatedStats?.bytesCommitted
jobResponse.rowsSynced = jobRead.aggregatedStats?.recordsCommitted
return jobResponse
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ public JobInfoRead getJobInfoRead(final Job job) {
.attempts(job.getAttempts().stream().map(this::getAttemptInfoRead).collect(Collectors.toList()));
}

public JobInfoRead getJobInfoWithoutLogsRead(final Job job) {
return new JobInfoRead()
.job(getJobWithAttemptsRead(job).getJob())
.attempts(job.getAttempts().stream().map(this::getAttemptInfoWithoutLogsRead).collect(Collectors.toList()));
}

public JobInfoLightRead getJobInfoLightRead(final Job job) {
return new JobInfoLightRead().job(getJobRead(job));
}
Expand Down Expand Up @@ -157,7 +151,7 @@ public AttemptInfoRead getAttemptInfoRead(final Attempt attempt) {
.logs(getLogRead(attempt.getLogPath()));
}

public AttemptInfoRead getAttemptInfoWithoutLogsRead(final Attempt attempt) {
public static AttemptInfoRead getAttemptInfoWithoutLogsRead(final Attempt attempt) {
return new AttemptInfoRead()
.attempt(getAttemptRead(attempt));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,13 @@ public JobInfoRead getJobInfo(final JobIdRequestBody jobIdRequestBody) throws IO

public JobInfoRead getJobInfoWithoutLogs(final JobIdRequestBody jobIdRequestBody) throws IOException {
final Job job = jobPersistence.getJob(jobIdRequestBody.getId());
return jobConverter.getJobInfoWithoutLogsRead(job);

final JobWithAttemptsRead jobWithAttemptsRead = JobConverter.getJobWithAttemptsRead(job);
hydrateWithStats(List.of(jobWithAttemptsRead), List.of(job), true);

return new JobInfoRead()
.job(jobWithAttemptsRead.getJob())
.attempts(job.getAttempts().stream().map(JobConverter::getAttemptInfoWithoutLogsRead).collect(Collectors.toList()));
}

public JobInfoLightRead getJobInfoLight(final JobIdRequestBody jobIdRequestBody) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -681,4 +681,20 @@ void testGetAttemptNormalizationStatuses() throws IOException {

}

@Test
@DisplayName("Should test to ensure that JobInfoReadWithoutLogs includes the bytes and records committed")
void testGetJobInfoWithoutLogs() throws IOException {

when(jobPersistence.getJob(JOB_ID))
.thenReturn(new Job(JOB_ID, JOB_CONFIG.getConfigType(), JOB_CONFIG_ID, JOB_CONFIG, ImmutableList.of(testJobAttempt),
JOB_STATUS, null, CREATED_AT, CREATED_AT));
when(jobPersistence.getAttemptStats(List.of(JOB_ID)))
.thenReturn(Map.of(new JobAttemptPair(JOB_ID, testJobAttempt.getAttemptNumber()), FIRST_ATTEMPT_STATS));

final JobInfoRead resultingJobInfo = jobHistoryHandler.getJobInfoWithoutLogs(new JobIdRequestBody().id(JOB_ID));
assertEquals(resultingJobInfo.getJob().getAggregatedStats().getBytesCommitted(), FIRST_ATTEMPT_STATS.combinedStats().getBytesCommitted());
assertEquals(resultingJobInfo.getJob().getAggregatedStats().getRecordsCommitted(), FIRST_ATTEMPT_STATS.combinedStats().getRecordsCommitted());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class AirbyteApiAuthorizationHelper(
}

private fun buildPropertiesMapForJob(id: String): Map<String, String> {
return mapOf(Scope.JOB.mappedHeaderProperty to Jsons.serialize(id))
return mapOf(Scope.JOB.mappedHeaderProperty to id)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ class JobsFilter(
/**
* Convert Airbyte API job status to config API job status.
*/
fun getConfigApiStatus(): JobStatus? = status?.let { JobStatus.fromValue(it.toString()) }
fun getConfigApiStatuses(): List<JobStatus>? = if (status == null) null else JobStatus.entries.filter { it.name == status.name }
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,7 @@ object JobResponseMapper {
* @return JobResponse Response object which contains job id, status, and job type
*/
fun from(jobInfoRead: JobInfoRead): JobResponse {
val jobResponse: JobResponse = jobResponseFromJobReadMinusSyncedData(jobInfoRead.job)
if (jobInfoRead.attempts.size > 0) {
val lastAttempt = jobInfoRead.attempts[jobInfoRead.attempts.size - 1]
jobResponse.setBytesSynced(lastAttempt.attempt.totalStats?.bytesCommitted)
jobResponse.setRowsSynced(lastAttempt.attempt.totalStats?.recordsCommitted)
}
return jobResponse
return fromJobRead(jobInfoRead.job)
}

/**
Expand All @@ -49,41 +43,37 @@ object JobResponseMapper {
* @return JobResponse Response object which contains job id, status, and job type
*/
fun from(jobWithAttemptsRead: JobWithAttemptsRead): JobResponse {
val jobResponse: JobResponse = jobResponseFromJobReadMinusSyncedData(jobWithAttemptsRead.job)
if (jobWithAttemptsRead.attempts != null && jobWithAttemptsRead.attempts!!.size > 0) {
val lastAttempt = jobWithAttemptsRead.attempts!![jobWithAttemptsRead.attempts!!.size - 1]

jobResponse.setBytesSynced(lastAttempt.totalStats?.bytesCommitted)
jobResponse.setRowsSynced(lastAttempt.totalStats?.recordsCommitted)
}
return jobResponse
return fromJobRead(jobWithAttemptsRead.job)
}

/**
* Converts a JobRead object from the config api to a JobResponse object.
*/
private fun jobResponseFromJobReadMinusSyncedData(jobRead: JobRead?): JobResponse {
private fun fromJobRead(jobRead: JobRead?): JobResponse {
val jobResponse = JobResponse()
jobResponse.setJobId(jobRead!!.id)
jobResponse.setStatus(JobStatusEnum.fromValue(jobRead.status.toString()))
jobResponse.setConnectionId(UUID.fromString(jobRead.configId))
jobResponse.jobId = jobRead!!.id
jobResponse.status = JobStatusEnum.fromValue(jobRead.status.toString())
jobResponse.connectionId = UUID.fromString(jobRead.configId)
when (jobRead.configType) {
JobConfigType.SYNC -> jobResponse.setJobType(JobTypeEnum.SYNC)
JobConfigType.RESET_CONNECTION -> jobResponse.setJobType(JobTypeEnum.RESET)
JobConfigType.SYNC -> jobResponse.jobType = JobTypeEnum.SYNC
JobConfigType.RESET_CONNECTION -> jobResponse.jobType = JobTypeEnum.RESET
else -> {
assert(ALLOWED_CONFIG_TYPES.contains(jobRead.configType))
}
}
// set to string for now since the jax-rs response entity turns offsetdatetime into epoch seconds
jobResponse.setStartTime(OffsetDateTime.ofInstant(Instant.ofEpochSecond(jobRead.createdAt), UTC).toString())
jobResponse.startTime = OffsetDateTime.ofInstant(Instant.ofEpochSecond(jobRead.createdAt), UTC).toString()
if (TERMINAL_JOB_STATUS.contains(jobRead.status)) {
jobResponse.setLastUpdatedAt(OffsetDateTime.ofInstant(Instant.ofEpochSecond(jobRead.updatedAt), UTC).toString())
jobResponse.lastUpdatedAt = OffsetDateTime.ofInstant(Instant.ofEpochSecond(jobRead.updatedAt), UTC).toString()
}

// duration is ISO_8601 formatted https://en.wikipedia.org/wiki/ISO_8601#Durations
if (jobRead.status != JobStatus.PENDING) {
jobResponse.setDuration(Duration.ofSeconds(jobRead.updatedAt - jobRead.createdAt).toString())
jobResponse.duration = Duration.ofSeconds(jobRead.updatedAt - jobRead.createdAt).toString()
}

jobResponse.bytesSynced = jobRead.aggregatedStats?.bytesCommitted
jobResponse.rowsSynced = jobRead.aggregatedStats?.recordsCommitted
return jobResponse
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class JobServiceImpl(
.configId(connectionId.toString())
.configTypes(configTypes)
.pagination(Pagination().pageSize(jobsFilter.limit).rowOffset(jobsFilter.offset))
.statuses(listOf(jobsFilter.getConfigApiStatus()))
.statuses(jobsFilter.getConfigApiStatuses())
.createdAtStart(jobsFilter.createdAtStart)
.createdAtEnd(jobsFilter.createdAtEnd)
.updatedAtStart(jobsFilter.updatedAtStart)
Expand Down Expand Up @@ -190,7 +190,7 @@ class JobServiceImpl(
.workspaceIds(workspaceIdsToQuery)
.configTypes(configTypes)
.pagination(Pagination().pageSize(jobsFilter.limit).rowOffset(jobsFilter.offset))
.statuses(listOf(jobsFilter.getConfigApiStatus()))
.statuses(jobsFilter.getConfigApiStatuses())
.createdAtStart(jobsFilter.createdAtStart)
.createdAtEnd(jobsFilter.createdAtEnd)
.updatedAtStart(jobsFilter.updatedAtStart)
Expand Down

0 comments on commit 89139e2

Please sign in to comment.