Skip to content

Commit

Permalink
Pivotal ID # 188131969: Archieve submission request (#868)
Browse files Browse the repository at this point in the history
  • Loading branch information
Juan-EBI authored Aug 20, 2024
1 parent b4f2628 commit c8cc4ba
Show file tree
Hide file tree
Showing 19 changed files with 393 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ internal class SubmissionServiceTest {

testInstance.submit(subRequest)

verify(exactly = 0) { bioWebClient.getSubmissionRequestStatus(any(), any()) }
coVerify(exactly = 0) { bioWebClient.getSubmissionRequestStatus(any(), any()) }
verify(exactly = 1) {
create(SERVER).getAuthenticatedClient(USER, PASSWORD, ON_BEHALF)
bioWebClient.submitMultipartAsync(
Expand All @@ -69,15 +69,15 @@ internal class SubmissionServiceTest {
runTest {
val accepted = AcceptedSubmission("S-BSST1", 2)

every { bioWebClient.getSubmissionRequestStatus("S-BSST1", 2) } returns PROCESSED
coEvery { bioWebClient.getSubmissionRequestStatus("S-BSST1", 2) } returns PROCESSED
every { create(SERVER).getAuthenticatedClient(USER, PASSWORD, ON_BEHALF) } returns bioWebClient
every {
bioWebClient.submitMultipartAsync(subRequest.submissionFile, subRequest.parameters)
} returns accepted

testInstance.submit(subRequest.copy(await = true))

verify(exactly = 1) {
coVerify(exactly = 1) {
create(SERVER).getAuthenticatedClient(USER, PASSWORD, ON_BEHALF)
bioWebClient.getSubmissionRequestStatus("S-BSST1", 2)
bioWebClient.submitMultipartAsync(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
package ac.uk.ebi.biostd.client.api

import ac.uk.ebi.biostd.client.integration.web.SubmissionRequestOperations
import ebi.ac.uk.commons.http.ext.getForObject
import ebi.ac.uk.model.RequestStatus
import kotlinx.coroutines.reactive.awaitSingle
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBody

private const val SUBMISSION_REQUEST_URL = "/submissions/requests"

class SubmissionRequestClient(
private val client: WebClient,
) : SubmissionRequestOperations {
override fun getSubmissionRequestStatus(
override suspend fun getSubmissionRequestStatus(
accNo: String,
version: Int,
): RequestStatus {
return client.getForObject("$SUBMISSION_REQUEST_URL/$accNo/$version/status")
): RequestStatus =
client
.post()
.uri("$SUBMISSION_REQUEST_URL/$accNo/$version/status")
.retrieve()
.awaitBody<RequestStatus>()

override suspend fun archiveSubmissionRequest(
accNo: String,
version: Int,
) {
client
.post()
.uri("$SUBMISSION_REQUEST_URL/$accNo/$version/archive")
.retrieve()
.bodyToMono(Void::class.java)
.awaitSingle()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,13 @@ interface MultipartAsyncSubmitOperations {
}

interface SubmissionRequestOperations {
fun getSubmissionRequestStatus(
suspend fun getSubmissionRequestStatus(
accNo: String,
version: Int,
): RequestStatus

suspend fun archiveSubmissionRequest(
accNo: String,
version: Int,
)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package ebi.ac.uk.db

const val MONGO_VERSION = "mongo:4.0.10"
const val MONGO_VERSION = "mongo:4.4.22"
const val RABBIT_VERSION = "rabbitmq:3.7.25-management-alpine"
const val MYSQL_VERSION = "mysql:5.7.33"
const val MYSQL_SCHEMA = "Schema.sql"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ interface SubmissionRequestPersistenceService {
accNo: String,
version: Int,
): Boolean

suspend fun archiveRequest(
accNo: String,
version: Int,
)
}

sealed interface OptResponse<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,9 @@ object DocRequestFields {
const val RQT_STATUS_CHANGE_END_TIME = "endTime"
const val RQT_STATUS_CHANGE_RESULT = "result"
}

object DocRequestFileFields {
const val RQT_FILE_ACC_NO = "accNo"
const val RQT_FILE_VERSION = "version"
const val RQT_FILE_PATH = "path"
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,33 @@ import ac.uk.ebi.biostd.persistence.doc.db.converters.shared.DocSubmissionReques
import ac.uk.ebi.biostd.persistence.doc.db.converters.shared.DocSubmissionRequestFileFields.RQT_FILE_SUB_VERSION
import ac.uk.ebi.biostd.persistence.doc.db.converters.shared.DocSubmissionRequestFileFields.RQT_PREVIOUS_SUB_FILE
import ac.uk.ebi.biostd.persistence.doc.db.reactive.repositories.SubmissionRequestRepository
import ac.uk.ebi.biostd.persistence.doc.model.CollectionsNames.RQT_ARCH_COL
import ac.uk.ebi.biostd.persistence.doc.model.CollectionsNames.RQT_FILE_ARCH_COL
import ac.uk.ebi.biostd.persistence.doc.model.DocRequestStatusChanges
import ac.uk.ebi.biostd.persistence.doc.model.DocSubmissionRequest
import ac.uk.ebi.biostd.persistence.doc.model.DocSubmissionRequestFile
import com.google.common.collect.ImmutableList
import com.mongodb.BasicDBObject
import ebi.ac.uk.coroutines.every
import ebi.ac.uk.model.RequestStatus
import ebi.ac.uk.model.RequestStatus.Companion.PROCESSING
import ebi.ac.uk.model.RequestStatus.PROCESSED
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.count
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import org.bson.types.ObjectId
import org.springframework.data.mongodb.core.FindAndModifyOptions
import org.springframework.data.mongodb.core.ReactiveMongoTemplate
import org.springframework.data.mongodb.core.aggregation.Aggregation
import org.springframework.data.mongodb.core.aggregation.Aggregation.match
import org.springframework.data.mongodb.core.aggregation.AggregationOptions
import org.springframework.data.mongodb.core.aggregation.Fields
import org.springframework.data.mongodb.core.aggregation.MergeOperation.WhenDocumentsDontMatch
import org.springframework.data.mongodb.core.aggregation.MergeOperation.WhenDocumentsMatch
import org.springframework.data.mongodb.core.query.Criteria
import org.springframework.data.mongodb.core.query.Criteria.where
import org.springframework.data.mongodb.core.query.Query
Expand All @@ -69,15 +81,77 @@ class SubmissionRequestDocDataRepository(
) : SubmissionRequestRepository by submissionRequestRepository {
suspend fun saveRequest(request: DocSubmissionRequest): Pair<DocSubmissionRequest, Boolean> {
val result =
mongoTemplate.upsert(
Query(where(RQT_ACC_NO).`is`(request.accNo).andOperator(where(RQT_STATUS).ne(PROCESSED))),
request.asSetOnInsert(),
DocSubmissionRequest::class.java,
).awaitSingle()
mongoTemplate
.upsert(
Query(where(RQT_ACC_NO).`is`(request.accNo).andOperator(where(RQT_STATUS).ne(PROCESSED))),
request.asSetOnInsert(),
DocSubmissionRequest::class.java,
).awaitSingle()
val created = result.matchedCount < 1
return submissionRequestRepository.getByAccNoAndStatusIn(request.accNo, PROCESSING) to created
}

suspend fun archiveRequest(
accNo: String,
version: Int,
): Int {
val matchOperation =
match(
where(RQT_FILE_SUB_ACC_NO)
.`is`(accNo)
.andOperator(where(RQT_FILE_SUB_VERSION).`is`(version)),
)

fun archiveRequestFiles(): Flow<DocSubmissionRequestFile> {
var mergeOperation =
Aggregation
.merge()
.intoCollection(RQT_FILE_ARCH_COL)
.on(Fields.UNDERSCORE_ID)
.whenMatched(WhenDocumentsMatch.replaceDocument())
.whenNotMatched(WhenDocumentsDontMatch.insertNewDocument())
.build()
val aggregation =
Aggregation
.newAggregation(
DocSubmissionRequestFile::class.java,
matchOperation,
mergeOperation,
).withOptions(AggregationOptions.builder().allowDiskUse(true).build())
return mongoTemplate
.aggregate(aggregation, DocSubmissionRequestFile::class.java)
.asFlow()
}

suspend fun archiveRequest(): DocSubmissionRequest {
var mergeOperation =
Aggregation
.merge()
.intoCollection(RQT_ARCH_COL)
.on(Fields.UNDERSCORE_ID)
.whenMatched(WhenDocumentsMatch.replaceDocument())
.whenNotMatched(WhenDocumentsDontMatch.insertNewDocument())
.build()
val aggregation =
Aggregation
.newAggregation(
DocSubmissionRequest::class.java,
matchOperation,
mergeOperation,
).withOptions(AggregationOptions.builder().allowDiskUse(true).build())
return mongoTemplate
.aggregate(aggregation, DocSubmissionRequest::class.java)
.awaitSingle()
}

val archivedFiles =
archiveRequestFiles()
.every(REPORT_RATE) { "$accNo, $version archived file ${it.index}, path='${it.value.path}'" }
.count()
archiveRequest()
return archivedFiles
}

suspend fun findActiveRequests(filter: SubmissionListFilter): Pair<Int, List<DocSubmissionRequest>> {
val query = Query().addCriteria(createQuery(filter))
val requestCount = mongoTemplate.count(query, DocSubmissionRequest::class.java).awaitSingle()
Expand All @@ -90,9 +164,7 @@ class SubmissionRequestDocDataRepository(
suspend fun getRequest(
accNo: String,
version: Int,
): DocSubmissionRequest {
return submissionRequestRepository.getByAccNoAndVersion(accNo, version)
}
): DocSubmissionRequest = submissionRequestRepository.getByAccNoAndVersion(accNo, version)

suspend fun getRequest(
accNo: String,
Expand All @@ -111,14 +183,23 @@ class SubmissionRequestDocDataRepository(
result = null,
)
val update = Update().addToSet(RQT_STATUS_CHANGES, statusChange)
val query = Query(where(RQT_ACC_NO).`is`(accNo).and(RQT_VERSION).`is`(version).and(RQT_STATUS).`is`(status))
val query =
Query(
where(RQT_ACC_NO)
.`is`(accNo)
.and(RQT_VERSION)
.`is`(version)
.and(RQT_STATUS)
.`is`(status),
)
val result =
mongoTemplate.findAndModify(
query,
update,
FindAndModifyOptions.options().returnNew(true),
DocSubmissionRequest::class.java,
).awaitSingle()
mongoTemplate
.findAndModify(
query,
update,
FindAndModifyOptions.options().returnNew(true),
DocSubmissionRequest::class.java,
).awaitSingle()
return statusId.toString() to result
}

Expand All @@ -128,15 +209,17 @@ class SubmissionRequestDocDataRepository(
limit: Int,
): Pair<Int, List<DocSubmissionRequest>> {
val result =
mongoTemplate.find(query.skip(skip).limit(limit), DocSubmissionRequest::class.java)
mongoTemplate
.find(query.skip(skip).limit(limit), DocSubmissionRequest::class.java)
.asFlow()
.toList()
return result.count() to result
}

@Suppress("SpreadOperator")
private fun createQuery(filter: SubmissionListFilter): Criteria =
where("$SUB.$SUB_OWNER").`is`(filter.filterUser)
where("$SUB.$SUB_OWNER")
.`is`(filter.filterUser)
.andOperator(*criteriaArray(filter))

suspend fun increaseIndex(
Expand All @@ -155,7 +238,8 @@ class SubmissionRequestDocDataRepository(
.set(RQT_FILE_INDEX, file.index)
.set(RQT_FILE_STATUS, file.status)
val where =
where(RQT_FILE_SUB_ACC_NO).`is`(file.accNo)
where(RQT_FILE_SUB_ACC_NO)
.`is`(file.accNo)
.andOperator(
where(RQT_FILE_SUB_VERSION).`is`(file.version),
where(RQT_FILE_PATH).`is`(file.path),
Expand All @@ -169,7 +253,8 @@ class SubmissionRequestDocDataRepository(
val serializedFile = extSerializationService.serialize(file.file)
val update = update(RQT_FILE_FILE, BasicDBObject.parse(serializedFile)).set(RQT_FILE_STATUS, file.status)
val where =
where(RQT_FILE_SUB_ACC_NO).`is`(file.accNo)
where(RQT_FILE_SUB_ACC_NO)
.`is`(file.accNo)
.andOperator(
where(RQT_FILE_SUB_VERSION).`is`(file.version),
where(RQT_FILE_PATH).`is`(file.path),
Expand Down Expand Up @@ -201,8 +286,12 @@ class SubmissionRequestDocDataRepository(
) {
val query =
Query(
where(SUB_ACC_NO).`is`(rqt.accNo).and(SUB_VERSION).`is`(rqt.version)
.and("$RQT_STATUS_CHANGES.$RQT_STATUS_CHANGE_STATUS_ID").`is`(ObjectId(processId)),
where(SUB_ACC_NO)
.`is`(rqt.accNo)
.and(SUB_VERSION)
.`is`(rqt.version)
.and("$RQT_STATUS_CHANGES.$RQT_STATUS_CHANGE_STATUS_ID")
.`is`(ObjectId(processId)),
)
val update =
Update()
Expand All @@ -222,15 +311,18 @@ class SubmissionRequestDocDataRepository(
}

private fun criteriaArray(filter: SubmissionListFilter): Array<Criteria> =
ImmutableList.Builder<Criteria>().apply {
add(where(SUB_STATUS).`in`(PROCESSING))
filter.accNo?.let { add(where("$SUB.$SUB_ACC_NO").`is`(it)) }
filter.type?.let { add(where("$SUB.$SUB_SECTION.$SEC_TYPE").`is`(it)) }
filter.rTimeFrom?.let { add(where("$SUB.$SUB_RELEASE_TIME").gte(it.toString())) }
filter.rTimeTo?.let { add(where("$SUB.$SUB_RELEASE_TIME").lte(it.toString())) }
filter.keywords?.let { add(keywordsCriteria(it)) }
filter.released?.let { add(where("$SUB.$SUB_RELEASED").`is`(it)) }
}.build().toTypedArray()
ImmutableList
.Builder<Criteria>()
.apply {
add(where(SUB_STATUS).`in`(PROCESSING))
filter.accNo?.let { add(where("$SUB.$SUB_ACC_NO").`is`(it)) }
filter.type?.let { add(where("$SUB.$SUB_SECTION.$SEC_TYPE").`is`(it)) }
filter.rTimeFrom?.let { add(where("$SUB.$SUB_RELEASE_TIME").gte(it.toString())) }
filter.rTimeTo?.let { add(where("$SUB.$SUB_RELEASE_TIME").lte(it.toString())) }
filter.keywords?.let { add(keywordsCriteria(it)) }
filter.released?.let { add(where("$SUB.$SUB_RELEASED").`is`(it)) }
}.build()
.toTypedArray()

private fun keywordsCriteria(keywords: String) =
Criteria().orOperator(
Expand All @@ -239,6 +331,10 @@ class SubmissionRequestDocDataRepository(
where(ATTRIBUTE_DOC_NAME).`is`("Title").and(ATTRIBUTE_DOC_VALUE).regex("(?i).*$keywords.*"),
),
)

companion object {
const val REPORT_RATE = 200
}
}

enum class ProcessResult {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ac.uk.ebi.biostd.persistence.doc.db.data

import ac.uk.ebi.biostd.persistence.doc.db.reactive.repositories.SubmissionRequestFilesRepository

class SubmissionRequestFilesDocDataRepository(
private val submissionRequestFilesRepository: SubmissionRequestFilesRepository,
) : SubmissionRequestFilesRepository by submissionRequestFilesRepository
Loading

0 comments on commit c8cc4ba

Please sign in to comment.