Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pivotal ID # 184150619: Request retry mechanism #668

Merged
merged 4 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,6 @@ enum class RequestStatus {
PROCESSED;

companion object {
val PROCESSING: Set<RequestStatus> = setOf(REQUESTED, LOADED, CLEANED, FILES_COPIED, CHECK_RELEASED)
val PROCESSING: Set<RequestStatus> = setOf(REQUESTED, INDEXED, LOADED, CLEANED, FILES_COPIED, CHECK_RELEASED)
jhoanmanuelms marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import ac.uk.ebi.biostd.persistence.common.request.SubmissionFilter
import ebi.ac.uk.extended.model.ExtFile
import ebi.ac.uk.extended.model.ExtSubmission
import org.springframework.data.domain.Page
import java.time.temporal.TemporalAmount

interface SubmissionPersistenceService {
fun saveSubmission(submission: ExtSubmission): ExtSubmission
Expand Down Expand Up @@ -57,6 +58,8 @@ interface SubmissionPersistenceQueryService {
interface SubmissionRequestPersistenceService {
fun hasActiveRequest(accNo: String): Boolean

fun getProcessingRequests(since: TemporalAmount? = null): List<Pair<String, Int>>

fun saveSubmissionRequest(rqt: SubmissionRequest): Pair<String, Int>

fun createSubmissionRequest(rqt: SubmissionRequest): Pair<String, Int>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.springframework.data.mongodb.core.query.Meta.CursorOption
import org.springframework.data.mongodb.repository.Meta
import org.springframework.data.mongodb.repository.MongoRepository
import org.springframework.data.mongodb.repository.Query
import java.time.Instant
import java.util.stream.Stream

interface SubmissionMongoRepository : MongoRepository<DocSubmission, ObjectId> {
Expand Down Expand Up @@ -52,6 +53,13 @@ interface SubmissionRequestRepository : MongoRepository<DocSubmissionRequest, St
fun existsByAccNoAndStatusIn(accNo: String, status: Set<RequestStatus>): Boolean

fun getByAccNoAndVersion(accNo: String, version: Int): DocSubmissionRequest

fun findByStatusIn(status: Set<RequestStatus>): List<DocSubmissionRequest>

fun findByStatusInAndModificationTimeLessThan(
status: Set<RequestStatus>,
since: Instant,
): List<DocSubmissionRequest>
}

interface SubmissionRequestFilesRepository : MongoRepository<DocSubmissionRequestFile, ObjectId> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ac.uk.ebi.biostd.persistence.doc.service
import ac.uk.ebi.biostd.persistence.common.model.RequestStatus
import ac.uk.ebi.biostd.persistence.common.model.RequestStatus.CHECK_RELEASED
import ac.uk.ebi.biostd.persistence.common.model.RequestStatus.CLEANED
import ac.uk.ebi.biostd.persistence.common.model.RequestStatus.Companion.PROCESSING
import ac.uk.ebi.biostd.persistence.common.model.RequestStatus.FILES_COPIED
import ac.uk.ebi.biostd.persistence.common.model.RequestStatus.INDEXED
import ac.uk.ebi.biostd.persistence.common.model.RequestStatus.LOADED
Expand All @@ -16,15 +17,25 @@ import com.mongodb.BasicDBObject
import org.bson.types.ObjectId
import uk.ac.ebi.extended.serialization.service.ExtSerializationService
import uk.ac.ebi.extended.serialization.service.Properties
import java.time.Instant
import java.time.ZoneOffset.UTC
import java.time.temporal.TemporalAmount

@Suppress("TooManyFunctions")
class SubmissionRequestMongoPersistenceService(
private val serializationService: ExtSerializationService,
private val requestRepository: SubmissionRequestDocDataRepository,
) : SubmissionRequestPersistenceService {
override fun hasActiveRequest(accNo: String): Boolean {
return requestRepository.existsByAccNoAndStatusIn(accNo, RequestStatus.PROCESSING)
return requestRepository.existsByAccNoAndStatusIn(accNo, PROCESSING)
}

override fun getProcessingRequests(since: TemporalAmount?): List<Pair<String, Int>> {
val request = when (since) {
null -> requestRepository.findByStatusIn(PROCESSING)
else -> requestRepository.findByStatusInAndModificationTimeLessThan(PROCESSING, Instant.now().minus(since))
}
return request.map { it.accNo to it.version }
}

override fun saveSubmissionRequest(rqt: SubmissionRequest): Pair<String, Int> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ac.uk.ebi.biostd.persistence.doc.service

import ac.uk.ebi.biostd.persistence.common.model.RequestStatus
import ac.uk.ebi.biostd.persistence.common.model.RequestStatus.CLEANED
import ac.uk.ebi.biostd.persistence.common.model.RequestStatus.FILES_COPIED
import ac.uk.ebi.biostd.persistence.common.model.SubmissionRequestFile
import ac.uk.ebi.biostd.persistence.doc.db.data.SubmissionRequestDocDataRepository
import ac.uk.ebi.biostd.persistence.doc.db.repositories.SubmissionRequestFilesRepository
Expand Down Expand Up @@ -62,6 +64,30 @@ class SubmissionRequestMongoPersistenceServiceTest(
unmockkStatic(Instant::class)
}

@Test
fun getProcessingRequests() {
fun testRequest(accNo: String, version: Int, modificationTime: Instant, status: RequestStatus) =
DocSubmissionRequest(
id = ObjectId(),
accNo = accNo,
version = version,
status = status,
draftKey = null,
notifyTo = "user@test.org",
submission = BasicDBObject.parse(jsonObj { "submission" to "S-BSST0" }.toString()),
totalFiles = 5,
currentIndex = 0,
modificationTime = modificationTime
)

requestRepository.save(testRequest("abc", 1, Instant.now().minusSeconds(10), CLEANED))
requestRepository.save(testRequest("zxy", 2, Instant.now().minusSeconds(20), FILES_COPIED))

assertThat(testInstance.getProcessingRequests()).containsExactly("abc" to 1, "zxy" to 2)
assertThat(testInstance.getProcessingRequests(ofSeconds(5))).containsExactly("abc" to 1, "zxy" to 2)
assertThat(testInstance.getProcessingRequests(ofSeconds(15))).containsExactly("zxy" to 2)
}

@Test
fun `update requestFile`() {
val extFile = createNfsFile("requested.txt", "Files/requested.txt", tempFolder.createFile("requested.txt"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import ac.uk.ebi.biostd.submission.domain.helpers.OnBehalfUtils
import ac.uk.ebi.biostd.submission.domain.helpers.TempFileGenerator
import ac.uk.ebi.biostd.submission.domain.service.ExtSubmissionQueryService
import ac.uk.ebi.biostd.submission.domain.service.ExtSubmissionService
import ac.uk.ebi.biostd.submission.domain.service.RetryHandler
import ac.uk.ebi.biostd.submission.domain.service.SubmissionDraftService
import ac.uk.ebi.biostd.submission.domain.service.SubmissionQueryService
import ac.uk.ebi.biostd.submission.domain.service.SubmissionService
Expand Down Expand Up @@ -109,6 +110,12 @@ class SubmissionConfig(
eventsPublisherService,
)

@Bean
fun startApplicationHandler(
extSubmissionService: ExtSubmissionService,
requestService: SubmissionRequestPersistenceService,
) = RetryHandler(extSubmissionService, requestService)

@Bean
fun projectService(
collectionSqlDataService: CollectionDataService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class ExtSubmissionService(
private val properties: ApplicationProperties,
private val eventsPublisherService: EventsPublisherService,
) {

fun reTriggerSubmission(accNo: String, version: Int): ExtSubmission {
return submissionSubmitter.handleRequest(accNo, version)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ac.uk.ebi.biostd.submission.domain.service

import ac.uk.ebi.biostd.persistence.common.service.SubmissionRequestPersistenceService
import mu.KotlinLogging
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
import org.springframework.scheduling.annotation.Scheduled
import java.time.Duration
import java.time.temporal.ChronoUnit

private val logger = KotlinLogging.logger {}

/**
* Re trigger pending per processing request on application start.
*/
class RetryHandler(
private val extSubmissionService: ExtSubmissionService,
private val requestService: SubmissionRequestPersistenceService,
) {

@EventListener(ApplicationReadyEvent::class)
fun onStart() {
logger.info { "Re processing pending submission on application start" }
requestService.getProcessingRequests()
.forEach { (accNo, version) -> reTriggerSafely(accNo, version) }
}

@Scheduled(cron = "0 0 */3 * * ?")
@Suppress("MagicNumber")
fun onSchedule() {
logger.info { "Scheduled re processing of pending submission" }
requestService.getProcessingRequests(Duration.of(3, ChronoUnit.HOURS))
.forEach { (accNo, version) -> reTriggerSafely(accNo, version) }
}

private fun reTriggerSafely(accNo: String, version: Int) {
runCatching { extSubmissionService.reTriggerSubmission(accNo, version) }
.onFailure { logger.error { "Failed to re triggering request accNo='$accNo', version='$version'" } }
.onSuccess { logger.info { "Completed processing of request accNo='$accNo', version='$version'" } }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package ac.uk.ebi.biostd.submission.domain.service

import ac.uk.ebi.biostd.persistence.common.service.SubmissionRequestPersistenceService
import ebi.ac.uk.extended.model.ExtSubmission
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension
import io.mockk.verify
import org.junit.Test
import org.junit.jupiter.api.extension.ExtendWith
import java.time.Duration
import java.time.temporal.ChronoUnit.HOURS

@ExtendWith(MockKExtension::class)
class RetryHandlerTest(
@MockK private val extSubmissionService: ExtSubmissionService,
@MockK private val requestService: SubmissionRequestPersistenceService,
) {

private val testInstance = RetryHandler(extSubmissionService, requestService)

@Test
fun onStart(@MockK submission: ExtSubmission) {
every { requestService.getProcessingRequests() } returns listOf("a" to 1, "b" to 2)
every { extSubmissionService.reTriggerSubmission("a", 1) } throws IllegalStateException("Error trigger")
every { extSubmissionService.reTriggerSubmission("b", 2) } answers { submission }

testInstance.onStart()

verify(exactly = 1) { extSubmissionService.reTriggerSubmission("a", 1) }
verify(exactly = 1) { extSubmissionService.reTriggerSubmission("b", 2) }
}

@Test
fun onSchedule(@MockK submission: ExtSubmission) {
every { requestService.getProcessingRequests(Duration.of(3, HOURS)) } returns listOf("a" to 1, "b" to 2)
every { extSubmissionService.reTriggerSubmission("a", 1) } throws IllegalStateException("Error trigger")
every { extSubmissionService.reTriggerSubmission("b", 2) } answers { submission }

testInstance.onStart()

verify(exactly = 1) { extSubmissionService.reTriggerSubmission("a", 1) }
verify(exactly = 1) { extSubmissionService.reTriggerSubmission("b", 2) }
}
}