Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pivotal ID # 186425798: Regenerate all PMC submission FTP #779

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package ac.uk.ebi.biostd.client.api

import ac.uk.ebi.biostd.client.integration.web.GeneralOperations
import ebi.ac.uk.api.dto.UserGroupDto
import ebi.ac.uk.commons.http.builder.linkedMultiValueMapOf
import ebi.ac.uk.commons.http.ext.RequestParams
import ebi.ac.uk.commons.http.ext.getForObject
import ebi.ac.uk.commons.http.ext.postAsync
import ebi.ac.uk.commons.http.ext.postForObject
import ebi.ac.uk.commons.http.ext.put
import ebi.ac.uk.model.Collection
import ebi.ac.uk.model.Group
import org.springframework.util.LinkedMultiValueMap
import org.springframework.web.reactive.function.client.WebClient

private const val GROUP_URL = "/groups"
Expand All @@ -26,9 +27,9 @@ class CommonOperationsClient(
return client.getForObject<Array<Collection>>(PROJECTS_URL).toList()
}

override fun generateFtpLink(relPath: String) {
val body = LinkedMultiValueMap(mapOf("relPath" to listOf(relPath)))
client.postForObject<String>("$FTP_URL/generate", RequestParams(body = body))
override suspend fun generateFtpLinks(accNo: String) {
val body = linkedMultiValueMapOf("accNo" to accNo)
client.postAsync("$FTP_URL/generate", RequestParams(body = body))
}

override fun createGroup(groupName: String, groupDescription: String): UserGroupDto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ interface SecurityOperations {
interface GeneralOperations {
fun getGroups(): List<Group>
fun getCollections(): List<Collection>
fun generateFtpLink(relPath: String)
suspend fun generateFtpLinks(accNo: String)
fun createGroup(groupName: String, groupDescription: String): UserGroupDto
fun addUserInGroup(groupName: String, userName: String)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ suspend inline fun <reified T> WebClient.putForObjectAsync(url: String, params:
return put().retrieveAsync<T>(url, params)!!
}

suspend fun WebClient.postAsync(url: String, params: RequestParams? = null) {
val uriSpec = post().uri(url)
params?.headers?.let { headers -> uriSpec.headers { it.addAll(headers) } }
params?.body?.let { body -> uriSpec.bodyValue(body) }

val response = uriSpec.awaitExchange()
require(response.statusCode().isError.not()) { response.toString() }
}

suspend fun WebClient.deleteAsync(url: String) {
val response = delete().uri(url).awaitExchange()
require(response.statusCode().isError.not()) { response.toString() }
Expand Down
2 changes: 2 additions & 0 deletions scheduler/tasks/submission-releaser-task/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import Dependencies.KotlinCoroutines
import Dependencies.KotlinLogging
import Dependencies.KotlinStdLib
import Projects.ClientBioWebClient
Expand Down Expand Up @@ -29,6 +30,7 @@ dependencies {
api(project(SubmissionPersistenceCommonApi))
api(project(SubmissionPersistenceMongo))

implementation(KotlinCoroutines)
implementation(KotlinLogging)
implementation(KotlinStdLib)
implementation(SpringBootStarterAmqp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package uk.ac.ebi.scheduler.releaser
import ac.uk.ebi.scheduler.properties.ReleaserMode.GENERATE_FTP_LINKS
import ac.uk.ebi.scheduler.properties.ReleaserMode.NOTIFY
import ac.uk.ebi.scheduler.properties.ReleaserMode.RELEASE
import kotlinx.coroutines.runBlocking
import org.springframework.boot.CommandLineRunner
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.context.properties.EnableConfigurationProperties
Expand Down Expand Up @@ -30,10 +31,12 @@ class SubmissionReleaserExecutor(
private lateinit var context: ApplicationContext

override fun run(vararg args: String?) {
when (applicationProperties.mode) {
NOTIFY -> submissionReleaserService.notifySubmissionReleases()
RELEASE -> submissionReleaserService.releaseDailySubmissions()
GENERATE_FTP_LINKS -> submissionReleaserService.generateFtpLinks()
runBlocking {
when (applicationProperties.mode) {
NOTIFY -> submissionReleaserService.notifySubmissionReleases()
RELEASE -> submissionReleaserService.releaseDailySubmissions()
GENERATE_FTP_LINKS -> submissionReleaserService.generateFtpLinks()
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ package uk.ac.ebi.scheduler.releaser.config

import ac.uk.ebi.biostd.client.integration.web.BioWebClient
import ac.uk.ebi.biostd.client.integration.web.SecurityWebClient
import ac.uk.ebi.biostd.persistence.doc.MongoDbReactiveConfig
import ac.uk.ebi.biostd.persistence.doc.db.reactive.repositories.SubmissionReleaserRepository
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import uk.ac.ebi.events.config.EventsProperties
import uk.ac.ebi.scheduler.releaser.SubmissionReleaserExecutor
import uk.ac.ebi.scheduler.releaser.persistence.ReleaserRepository
import uk.ac.ebi.scheduler.releaser.service.EventsPublisherService
import uk.ac.ebi.scheduler.releaser.service.SubmissionReleaserService

@Configuration
@Import(MongoDbReactiveConfig::class)
class ApplicationConfig(
private val releaserRepository: ReleaserRepository,
private val releaserRepository: SubmissionReleaserRepository,
private val appProperties: ApplicationProperties,
) {
@Bean
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
package uk.ac.ebi.scheduler.releaser.service

import ac.uk.ebi.biostd.client.dto.ReleaseRequestDto
import ac.uk.ebi.biostd.client.integration.web.BioWebClient
import ac.uk.ebi.biostd.persistence.doc.db.reactive.repositories.SubmissionReleaserRepository
import ac.uk.ebi.biostd.persistence.doc.db.repositories.ReleaseData
import ebi.ac.uk.util.date.asOffsetAtEndOfDay
import ebi.ac.uk.util.date.asOffsetAtStartOfDay
import ebi.ac.uk.util.date.toDate
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.withContext
import mu.KotlinLogging
import uk.ac.ebi.scheduler.releaser.config.NotificationTimes
import uk.ac.ebi.scheduler.releaser.model.ReleaseData
import uk.ac.ebi.scheduler.releaser.persistence.ReleaserRepository
import java.time.Instant
import java.time.temporal.ChronoUnit.DAYS

Expand All @@ -16,29 +21,35 @@ private val logger = KotlinLogging.logger {}
class SubmissionReleaserService(
private val bioWebClient: BioWebClient,
private val notificationTimes: NotificationTimes,
private val releaserRepository: ReleaserRepository,
private val releaserRepository: SubmissionReleaserRepository,
private val eventsPublisherService: EventsPublisherService,
) {
fun notifySubmissionReleases() {
suspend fun notifySubmissionReleases() {
val today = Instant.now()
notifyRelease(today.plus(notificationTimes.firstWarningDays, DAYS))
notifyRelease(today.plus(notificationTimes.secondWarningDays, DAYS))
notifyRelease(today.plus(notificationTimes.thirdWarningDays, DAYS))
}

fun releaseDailySubmissions() {
suspend fun releaseDailySubmissions() {
val to = Instant.now().asOffsetAtEndOfDay()
logger.info { "Releasing submissions up to $to" }

releaserRepository
.findAllUntil(to.toDate())
.forEach(::releaseSafely)
withContext(Dispatchers.Default) {
releaserRepository
.findAllUntil(to.toDate())
.map { async { releaseSafely(it) } }
.collect { it.await() }
}
}

fun generateFtpLinks() {
releaserRepository
.findAllReleased()
.forEach(::generateFtpLink)
suspend fun generateFtpLinks() {
withContext(Dispatchers.Default) {
releaserRepository
.findAllReleased()
.map { async { generateFtpLinks(it) } }
.collect { it.await() }
}
}

private fun releaseSafely(releaseData: ReleaseData) {
Expand All @@ -49,23 +60,29 @@ class SubmissionReleaserService(
.onSuccess { logger.info { "Released submission ${releaseData.accNo}" } }
}

private fun notifyRelease(date: Instant) {
private fun ReleaseData.asReleaseDto() = ReleaseRequestDto(accNo, owner, relPath)

private suspend fun notifyRelease(date: Instant) {
val from = date.asOffsetAtStartOfDay()
val to = date.asOffsetAtEndOfDay()

logger.info { "Notifying submissions releases from $from to $to" }
releaserRepository
.findAllBetween(from.toDate(), to.toDate())
.forEach(::notify)

withContext(Dispatchers.Default) {
releaserRepository
.findAllBetween(from.toDate(), to.toDate())
.map { async { notify(it) } }
.collect { it.await() }
}
}

private fun notify(releaseData: ReleaseData) {
logger.info { "Notifying submission release for ${releaseData.accNo}" }
eventsPublisherService.subToBePublished(releaseData.accNo, releaseData.owner)
}

private fun generateFtpLink(releaseData: ReleaseData) {
logger.info { "Generating FTP link for submission ${releaseData.accNo}" }
bioWebClient.generateFtpLink(releaseData.relPath)
private suspend fun generateFtpLinks(releaseData: ReleaseData) {
logger.info { "Generating FTP links for submission ${releaseData.accNo}" }
bioWebClient.generateFtpLinks(releaseData.accNo)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,27 @@ package uk.ac.ebi.scheduler.releaser.service

import ac.uk.ebi.biostd.client.dto.ReleaseRequestDto
import ac.uk.ebi.biostd.client.integration.web.BioWebClient
import ac.uk.ebi.biostd.persistence.doc.db.reactive.repositories.SubmissionReleaserRepository
import ac.uk.ebi.biostd.persistence.doc.db.repositories.ReleaseData
import ebi.ac.uk.util.date.asOffsetAtEndOfDay
import ebi.ac.uk.util.date.toDate
import io.mockk.clearAllMocks
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension
import io.mockk.mockkStatic
import io.mockk.slot
import io.mockk.verify
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.test.runTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import uk.ac.ebi.scheduler.releaser.config.NotificationTimes
import uk.ac.ebi.scheduler.releaser.model.ReleaseData
import uk.ac.ebi.scheduler.releaser.persistence.ReleaserRepository
import java.time.Instant
import java.time.OffsetDateTime
import java.time.ZoneOffset.UTC
Expand All @@ -28,7 +32,7 @@ import java.util.Date
class SubmissionReleaserServiceTest(
@MockK private val bioWebClient: BioWebClient,
@MockK private val notificationTimes: NotificationTimes,
@MockK private val releaserRepository: ReleaserRepository,
@MockK private val releaserRepository: SubmissionReleaserRepository,
@MockK private val eventsPublisherService: EventsPublisherService,
) {
private val mockNow = OffsetDateTime.of(2020, 9, 21, 10, 11, 0, 0, UTC).toInstant()
Expand All @@ -46,7 +50,7 @@ class SubmissionReleaserServiceTest(
}

@Test
fun `notify submission release`() {
fun `notify submission release`() = runTest {
val firstWarningData = ReleaseData("S-BSST0", "owner0@mail.org", "S-BSST/000/S-BSST0")
val secondWarningData = ReleaseData("S-BSST1", "owner1@mail.org", "S-BSST/001/S-BSST1")
val thirdWarningData = ReleaseData("S-BSST2", "owner2@mail.org", "S-BSST/002/S-BSST2")
Expand All @@ -65,13 +69,13 @@ class SubmissionReleaserServiceTest(
@Test
fun `release daily submissions`(
@MockK to: Date
) {
) = runTest {
val requestSlot = slot<ReleaseRequestDto>()
val released = ReleaseData("S-BSST0", "owner0@mail.org", "S-BSST/000/S-BSST0")

every { mockNow.asOffsetAtEndOfDay().toDate() } returns to
every { bioWebClient.releaseSubmission(capture(requestSlot)) } answers { nothing }
every { releaserRepository.findAllUntil(to) } returns listOf(released)
every { releaserRepository.findAllUntil(to) } returns flowOf(released)

testInstance.releaseDailySubmissions()

Expand All @@ -83,21 +87,21 @@ class SubmissionReleaserServiceTest(
}

@Test
fun `generate ftp links`() {
fun `generate ftp links`() = runTest {
val released = ReleaseData("S-BSST0", "owner0@mail.org", "S-BSST/000/S-BSST0")

every { releaserRepository.findAllReleased() } returns listOf(released)
every { bioWebClient.generateFtpLink("S-BSST/000/S-BSST0") } answers { nothing }
every { releaserRepository.findAllReleased() } returns flowOf(released)
coEvery { bioWebClient.generateFtpLinks("S-BSST0") } answers { nothing }

testInstance.generateFtpLinks()
verify(exactly = 1) { bioWebClient.generateFtpLink("S-BSST/000/S-BSST0") }
coVerify(exactly = 1) { bioWebClient.generateFtpLinks("S-BSST0") }
}

private fun mockNotificationQuery(month: Int, day: Int, response: ReleaseData) {
val from = OffsetDateTime.of(2020, month, day, 0, 0, 0, 0, UTC).toDate()
val to = OffsetDateTime.of(2020, month, day, 23, 59, 59, 0, UTC).toDate()

every { releaserRepository.findAllBetween(from, to) } returns listOf(response)
every { releaserRepository.findAllBetween(from, to) } returns flowOf(response)
every { eventsPublisherService.subToBePublished(response.accNo, response.owner) } answers { nothing }
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
package uk.ac.ebi.scheduler.releaser.persistence
package ac.uk.ebi.biostd.persistence.doc.db.reactive.repositories

import ac.uk.ebi.biostd.persistence.doc.db.repositories.ReleaseData
import ac.uk.ebi.biostd.persistence.doc.model.DocSubmission
import kotlinx.coroutines.flow.Flow
import org.bson.types.ObjectId
import org.springframework.data.mongodb.repository.Query
import org.springframework.data.repository.PagingAndSortingRepository
import uk.ac.ebi.scheduler.releaser.model.ReleaseData
import org.springframework.data.repository.kotlin.CoroutineCrudRepository
import java.util.Date

interface ReleaserRepository : PagingAndSortingRepository<DocSubmission, ObjectId> {
interface SubmissionReleaserRepository : CoroutineCrudRepository<DocSubmission, ObjectId> {
@Query(
value = "{ released: true, version: { \$gte: 0 } }",
fields = "{ accNo: 1, owner: 1, relPath: 1 }"
)
fun findAllReleased(): List<ReleaseData>
fun findAllReleased(): Flow<ReleaseData>

@Query(
value = "{ releaseTime: { \$lte: ?0 }, released: false, version: { \$gte: 0 } }",
fields = "{ accNo: 1, owner: 1, relPath: 1 }"
)
fun findAllUntil(toRTime: Date): List<ReleaseData>
fun findAllUntil(toRTime: Date): Flow<ReleaseData>

@Query(
value = "{ releaseTime: { \$gte: ?0, \$lte: ?1 }, released: false, version: { \$gte: 0 } }",
fields = "{ accNo: 1, owner: 1, relPath: 1 }"
)
fun findAllBetween(fromRTime: Date, toRTime: Date): List<ReleaseData>
fun findAllBetween(fromRTime: Date, toRTime: Date): Flow<ReleaseData>
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,9 @@ import ac.uk.ebi.biostd.persistence.doc.model.DocCollection
data class SubmissionCollections(val collections: List<DocCollection>?)

data class MigrationData(val accNo: String)

data class ReleaseData(
val accNo: String,
val owner: String,
val relPath: String
)
Loading