Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pivotal ID # 185352396: Fixed Concurrency File Operations #744

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ interface SubmissionRequestPersistenceService {

fun getProcessingRequests(since: TemporalAmount? = null): List<Pair<String, Int>>

fun saveSubmissionRequest(rqt: SubmissionRequest): Pair<String, Int>
suspend fun saveSubmissionRequest(rqt: SubmissionRequest): Pair<String, Int>

fun createSubmissionRequest(rqt: SubmissionRequest): Pair<String, Int>

fun updateRqtIndex(accNo: String, version: Int, index: Int)
suspend fun updateRqtIndex(accNo: String, version: Int, index: Int)

fun updateRqtIndex(requestFile: SubmissionRequestFile, file: ExtFile)
suspend fun updateRqtIndex(requestFile: SubmissionRequestFile, file: ExtFile)

fun getPendingRequest(accNo: String, version: Int): SubmissionRequest

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import ac.uk.ebi.biostd.persistence.doc.model.DocSubmissionRequestFile
import com.google.common.collect.ImmutableList
import com.mongodb.BasicDBObject
import ebi.ac.uk.extended.model.ExtFile
import kotlinx.coroutines.reactor.awaitSingleOrNull
import org.springframework.data.mongodb.core.ReactiveMongoTemplate
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Criteria.where
Expand Down Expand Up @@ -64,7 +65,7 @@ class SubmissionRequestDocDataRepository(
email: String? = null,
): Pair<Int, List<DocSubmissionRequest>> {
val query = Query().addCriteria(createQuery(filter, email))
val requestCount = mongoTemplate.count(query, DocSubmissionRequest::class.java).block()
val requestCount = mongoTemplate.count(query, DocSubmissionRequest::class.java).block()!!
return when {
requestCount <= filter.offset -> requestCount.toInt() to emptyList()
else -> findActiveRequests(query, filter.offset, filter.limit)
Expand All @@ -87,11 +88,11 @@ class SubmissionRequestDocDataRepository(
where("$SUB.$SUB_OWNER").`is`(email)
.andOperator(*criteriaArray(filter))

fun updateIndex(accNo: String, version: Int, index: Int) {
suspend fun updateIndex(accNo: String, version: Int, index: Int) {
val update = Update().set(RQT_IDX, index).set(RQT_MODIFICATION_TIME, Instant.now())
val query = Query(where(SUB_ACC_NO).`is`(accNo).andOperator(where(SUB_VERSION).`is`(version)))

mongoTemplate.updateFirst(query, update, DocSubmissionRequest::class.java).block()
mongoTemplate.updateFirst(query, update, DocSubmissionRequest::class.java).awaitSingleOrNull()
}

fun upsertSubmissionRequestFile(rqtFile: SubmissionRequestFile) {
Expand All @@ -110,7 +111,7 @@ class SubmissionRequestDocDataRepository(
mongoTemplate.updateFirst(Query(where), update, DocSubmissionRequestFile::class.java).block()
}

fun updateSubmissionRequest(rqt: DocSubmissionRequest) {
suspend fun updateSubmissionRequest(rqt: DocSubmissionRequest) {
val query = Query(where(SUB_ACC_NO).`is`(rqt.accNo).andOperator(where(SUB_VERSION).`is`(rqt.version)))
val update = Update()
.set(SUB_STATUS, rqt.status)
Expand All @@ -120,7 +121,7 @@ class SubmissionRequestDocDataRepository(
.set(RQT_TOTAL_FILES, rqt.totalFiles)
.set(RQT_MODIFICATION_TIME, rqt.modificationTime)

mongoTemplate.updateFirst(query, update, DocSubmissionRequest::class.java).block()
mongoTemplate.updateFirst(query, update, DocSubmissionRequest::class.java).awaitSingleOrNull()
}

private fun criteriaArray(filter: SubmissionFilter): Array<Criteria> =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class SubmissionRequestMongoPersistenceService(
return request.map { it.accNo to it.version }
}

override fun saveSubmissionRequest(rqt: SubmissionRequest): Pair<String, Int> {
override suspend fun saveSubmissionRequest(rqt: SubmissionRequest): Pair<String, Int> {
requestRepository.updateSubmissionRequest(asRequest(rqt))
return rqt.submission.accNo to rqt.submission.version
}
Expand All @@ -52,11 +52,11 @@ class SubmissionRequestMongoPersistenceService(
return request.accNo to request.version
}

override fun updateRqtIndex(accNo: String, version: Int, index: Int) {
override suspend fun updateRqtIndex(accNo: String, version: Int, index: Int) {
requestRepository.updateIndex(accNo, version, index)
}

override fun updateRqtIndex(requestFile: SubmissionRequestFile, file: ExtFile) {
override suspend fun updateRqtIndex(requestFile: SubmissionRequestFile, file: ExtFile) {
requestRepository.updateIndex(requestFile.accNo, requestFile.version, requestFile.index)
requestRepository.updateSubmissionRequestFile(requestFile.accNo, requestFile.version, requestFile.path, file)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension
import io.mockk.mockkStatic
import io.mockk.unmockkStatic
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import org.assertj.core.api.Assertions.assertThat
import org.bson.types.ObjectId
import org.junit.jupiter.api.AfterEach
Expand All @@ -42,6 +44,7 @@ import java.time.Instant

@ExtendWith(MockKExtension::class, SpringExtension::class, TemporaryFolderExtension::class)
@Testcontainers
@OptIn(ExperimentalCoroutinesApi::class)
@SpringBootTest(classes = [MongoDbReposConfig::class])
class SubmissionRequestMongoPersistenceServiceTest(
private val tempFolder: TemporaryFolder,
Expand Down Expand Up @@ -89,7 +92,7 @@ class SubmissionRequestMongoPersistenceServiceTest(
}

@Test
fun `update requestFile`() {
fun `update requestFile`() = runTest {
val extFile = createNfsFile("requested.txt", "Files/requested.txt", tempFolder.createFile("requested.txt"))
val requestFile = SubmissionRequestFile("S-BSST0", 1, index = 2, "requested.txt", extFile)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import ebi.ac.uk.dsl.tsv.line
import ebi.ac.uk.dsl.tsv.tsv
import ebi.ac.uk.extended.mapping.to.ToSubmissionMapper
import ebi.ac.uk.model.extensions.title
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import org.assertj.core.api.Assertions.assertThat
import org.awaitility.Awaitility.await
import org.junit.jupiter.api.BeforeAll
Expand All @@ -38,6 +40,7 @@ import java.time.Duration.ofSeconds

@Import(FilePersistenceConfig::class)
@ExtendWith(SpringExtension::class)
@OptIn(ExperimentalCoroutinesApi::class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class SubmissionAsyncTest(
@Autowired val securityTestService: SecurityTestService,
Expand Down Expand Up @@ -84,7 +87,7 @@ class SubmissionAsyncTest(
}

@Test
fun `19-2 check submission stages`() {
fun `19-2 check submission stages`() = runTest {
val submission = tsv {
line("Submission", "SimpleAsync2")
line("Title", "Submission Stages")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ExtSubmissionService(
private val securityService: ISecurityQueryService,
private val eventsPublisherService: EventsPublisherService,
) {
fun reTriggerSubmission(accNo: String, version: Int): ExtSubmission {
suspend fun reTriggerSubmission(accNo: String, version: Int): ExtSubmission {
return submissionSubmitter.handleRequest(accNo, version)
}

Expand All @@ -34,7 +34,7 @@ class ExtSubmissionService(
return submissionSubmitter.createRequest(ExtSubmitRequest(submission, user))
}

fun submitExt(
suspend fun submitExt(
user: String,
sub: ExtSubmission,
): ExtSubmission {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ac.uk.ebi.biostd.submission.domain.service

import ac.uk.ebi.biostd.persistence.common.service.SubmissionRequestPersistenceService
import kotlinx.coroutines.runBlocking
import mu.KotlinLogging
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
Expand Down Expand Up @@ -34,7 +35,7 @@ class RetryHandler(
}

private fun reTriggerSafely(accNo: String, version: Int) {
runCatching { extSubmissionService.reTriggerSubmission(accNo, version) }
runCatching { runBlocking { extSubmissionService.reTriggerSubmission(accNo, version) } }
.onFailure { logger.error { "Failed to re triggering request accNo='$accNo', version='$version'" } }
.onSuccess { logger.info { "Completed processing of request accNo='$accNo', version='$version'" } }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ class SubmissionStagesHandler(
fun calculateStats(rqt: RequestFinalized) {
processSafely(rqt) {
logger.info { "$accNo, Received finalized message for submission $accNo, version: $version" }
runBlocking { statsService.calculateSubFilesSize(accNo) }
statsService.calculateSubFilesSize(accNo)
}
}

private fun processSafely(request: RequestMessage, process: RequestMessage.() -> Unit) {
private fun processSafely(request: RequestMessage, process: suspend RequestMessage.() -> Unit) = runBlocking {
runCatching { process(request) }.onFailure { onError(it, request) }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ class ExtSubmissionResource(
): ExtFileTable = extSubmissionQueryService.getReferencedFiles(accNo, fileListPath.path)

@PostMapping("/re-trigger/{accNo}/{version}")
fun reTriggerSubmission(
suspend fun reTriggerSubmission(
@PathVariable accNo: String,
@PathVariable version: Int,
): ExtSubmission = extSubmissionService.reTriggerSubmission(accNo, version)

@PostMapping("/refresh/{accNo}")
fun refreshSubmission(
suspend fun refreshSubmission(
@BioUser user: SecurityUser,
@PathVariable accNo: String,
) = extSubmissionService.refreshSubmission(user.email, accNo)
Expand All @@ -64,7 +64,7 @@ class ExtSubmissionResource(

@PostMapping
@PreAuthorize("isAuthenticated()")
fun submitExtended(
suspend fun submitExtended(
@BioUser user: SecurityUser,
@RequestParam(SUBMISSION) extSubmission: String,
): ExtSubmission = extSubmissionService.submitExt(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ spring:
client:
enabled: false

mvc:
async:
request-timeout: -1

data:
mongodb:
database: # Mongo Database Name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package ac.uk.ebi.biostd.submission.domain.service

import ac.uk.ebi.biostd.persistence.common.service.SubmissionRequestPersistenceService
import ebi.ac.uk.extended.model.ExtSubmission
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension
import io.mockk.verify
import org.junit.Test
import org.junit.jupiter.api.extension.ExtendWith
import java.time.Duration
Expand All @@ -16,30 +17,29 @@ class RetryHandlerTest(
@MockK private val extSubmissionService: ExtSubmissionService,
@MockK private val requestService: SubmissionRequestPersistenceService,
) {

private val testInstance = RetryHandler(extSubmissionService, requestService)

@Test
fun onStart(@MockK submission: ExtSubmission) {
every { requestService.getProcessingRequests() } returns listOf("a" to 1, "b" to 2)
every { extSubmissionService.reTriggerSubmission("a", 1) } throws IllegalStateException("Error trigger")
every { extSubmissionService.reTriggerSubmission("b", 2) } answers { submission }
coEvery { extSubmissionService.reTriggerSubmission("a", 1) } throws IllegalStateException("Error trigger")
coEvery { extSubmissionService.reTriggerSubmission("b", 2) } answers { submission }

testInstance.onStart()

verify(exactly = 1) { extSubmissionService.reTriggerSubmission("a", 1) }
verify(exactly = 1) { extSubmissionService.reTriggerSubmission("b", 2) }
coVerify(exactly = 1) { extSubmissionService.reTriggerSubmission("a", 1) }
coVerify(exactly = 1) { extSubmissionService.reTriggerSubmission("b", 2) }
}

@Test
fun onSchedule(@MockK submission: ExtSubmission) {
every { requestService.getProcessingRequests(Duration.of(3, HOURS)) } returns listOf("a" to 1, "b" to 2)
every { extSubmissionService.reTriggerSubmission("a", 1) } throws IllegalStateException("Error trigger")
every { extSubmissionService.reTriggerSubmission("b", 2) } answers { submission }
coEvery { extSubmissionService.reTriggerSubmission("a", 1) } throws IllegalStateException("Error trigger")
coEvery { extSubmissionService.reTriggerSubmission("b", 2) } answers { submission }

testInstance.onStart()

verify(exactly = 1) { extSubmissionService.reTriggerSubmission("a", 1) }
verify(exactly = 1) { extSubmissionService.reTriggerSubmission("b", 2) }
coVerify(exactly = 1) { extSubmissionService.reTriggerSubmission("a", 1) }
coVerify(exactly = 1) { extSubmissionService.reTriggerSubmission("b", 2) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ class SubmissionStagesHandlerTest(
fun `index request`() {
val request = RequestCreated("S-BSTT0", 1)

every { submissionSubmitter.indexRequest(request) } answers { nothing }
coEvery { submissionSubmitter.indexRequest(request) } answers { nothing }
every { eventsPublisherService.requestIndexed("S-BSTT0", 1) } answers { nothing }

testInstance.indexRequest(request)

verify(exactly = 1) {
coVerify(exactly = 1) {
submissionSubmitter.indexRequest(request)
eventsPublisherService.requestIndexed("S-BSTT0", 1)
}
Expand All @@ -55,12 +55,12 @@ class SubmissionStagesHandlerTest(
fun `load request`() {
val request = RequestIndexed("S-BSTT0", 1)

every { submissionSubmitter.loadRequest(request) } answers { nothing }
coEvery { submissionSubmitter.loadRequest(request) } answers { nothing }
every { eventsPublisherService.requestLoaded("S-BSTT0", 1) } answers { nothing }

testInstance.loadRequest(request)

verify(exactly = 1) {
coVerify(exactly = 1) {
submissionSubmitter.loadRequest(request)
eventsPublisherService.requestLoaded("S-BSTT0", 1)
}
Expand All @@ -70,12 +70,12 @@ class SubmissionStagesHandlerTest(
fun `clean request`() {
val request = RequestLoaded("S-BSTT0", 1)

every { submissionSubmitter.cleanRequest(request) } answers { nothing }
coEvery { submissionSubmitter.cleanRequest(request) } answers { nothing }
every { eventsPublisherService.requestCleaned("S-BSTT0", 1) } answers { nothing }

testInstance.cleanRequest(request)

verify(exactly = 1) {
coVerify(exactly = 1) {
submissionSubmitter.cleanRequest(request)
eventsPublisherService.requestCleaned("S-BSTT0", 1)
}
Expand All @@ -85,12 +85,12 @@ class SubmissionStagesHandlerTest(
fun `copy request files`() {
val request = RequestCleaned("S-BSTT0", 1)

every { submissionSubmitter.processRequest(request) } answers { nothing }
coEvery { submissionSubmitter.processRequest(request) } answers { nothing }
every { eventsPublisherService.requestFilesCopied("S-BSTT0", 1) } answers { nothing }

testInstance.copyRequestFiles(request)

verify(exactly = 1) {
coVerify(exactly = 1) {
submissionSubmitter.processRequest(request)
eventsPublisherService.requestFilesCopied("S-BSTT0", 1)
}
Expand All @@ -100,12 +100,12 @@ class SubmissionStagesHandlerTest(
fun `check released`() {
val request = RequestFilesCopied("S-BSTT0", 1)

every { submissionSubmitter.checkReleased(request) } answers { nothing }
coEvery { submissionSubmitter.checkReleased(request) } answers { nothing }
every { eventsPublisherService.checkReleased("S-BSTT0", 1) } answers { nothing }

testInstance.checkReleased(request)

verify(exactly = 1) {
coVerify(exactly = 1) {
submissionSubmitter.checkReleased(request)
eventsPublisherService.checkReleased("S-BSTT0", 1)
}
Expand All @@ -119,12 +119,12 @@ class SubmissionStagesHandlerTest(

every { submission.accNo } returns "S-BSST0"
every { submission.owner } returns "owner@test.org"
every { submissionSubmitter.saveRequest(request) } returns submission
coEvery { submissionSubmitter.saveRequest(request) } returns submission
every { eventsPublisherService.submissionPersisted("S-BSTT0", 1) } answers { nothing }

testInstance.saveSubmission(request)

verify(exactly = 1) {
coVerify(exactly = 1) {
submissionSubmitter.saveRequest(request)
eventsPublisherService.submissionPersisted("S-BSTT0", 1)
}
Expand All @@ -135,11 +135,11 @@ class SubmissionStagesHandlerTest(
fun `finalize request`() {
val request = RequestPersisted("S-BSTT0", 1)

every { submissionSubmitter.finalizeRequest(request) } answers { nothing }
coEvery { submissionSubmitter.finalizeRequest(request) } answers { nothing }

testInstance.finalizeRequest(request)

verify(exactly = 1) { submissionSubmitter.finalizeRequest(request) }
coVerify(exactly = 1) { submissionSubmitter.finalizeRequest(request) }
verify(exactly = 0) { eventsPublisherService.submissionSubmitted(any(), any()) }
}

Expand Down
Loading