Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pivotal # 187104619: Submission Notification Listener WebClient Buffer Size #810

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package ac.uk.ebi.biostd.handlers

import org.springframework.amqp.rabbit.annotation.EnableRabbit
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.amqp.support.converter.MessageConverter
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.web.reactive.function.client.WebClient

@Suppress("SpreadOperator")
fun main(args: Array<String>) {
Expand All @@ -15,10 +11,4 @@ fun main(args: Array<String>) {

@SpringBootApplication
@EnableRabbit
class HandlersApplication {
@Bean
fun jsonMessageConverter(): MessageConverter = Jackson2JsonMessageConverter()

@Bean
fun webClient(): WebClient = WebClient.builder().build()
}
class HandlersApplication
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.amqp.support.converter.MessageConverter
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.core.io.ResourceLoader
import org.springframework.http.codec.ClientCodecConfigurer
import org.springframework.web.reactive.function.client.ExchangeStrategies
import org.springframework.web.reactive.function.client.WebClient
import uk.ac.ebi.extended.serialization.integration.ExtSerializationConfig
import uk.ac.ebi.extended.serialization.service.ExtSerializationService
Expand Down Expand Up @@ -78,7 +81,22 @@ class Listeners {
}

@Configuration
class Services {
class WebConfig {
@Bean
fun jsonMessageConverter(): MessageConverter = Jackson2JsonMessageConverter()

@Bean
fun webClient(): WebClient {
val exchangeStrategies =
ExchangeStrategies.builder().codecs { configurer ->
if (configurer is ClientCodecConfigurer) configurer.defaultCodecs().maxInMemorySize(-1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for?

}.build()

return WebClient.builder()
.exchangeStrategies(exchangeStrategies)
.build()
}

@Bean
fun bioStudiesWebConsumer(
client: WebClient,
Expand All @@ -90,7 +108,10 @@ class Services {
client: WebClient,
applicationProperties: ApplicationProperties
): NotificationsSender = NotificationsSender(client, applicationProperties.notifications.slackUrl)
}

@Configuration
class Services {
@Bean
fun extSerializationService(): ExtSerializationService = ExtSerializationConfig.extSerializationService()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import mu.KotlinLogging
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.amqp.rabbit.core.RabbitTemplate

private val logger = KotlinLogging.logger {}
private const val ERROR_MESSAGE = "Problem processing notification for submission %s"

class SubmissionNotificationsListener(
private val rabbitTemplate: RabbitTemplate,
private val webConsumer: BioStudiesWebConsumer,
Expand All @@ -34,7 +31,7 @@ class SubmissionNotificationsListener(
fun receiveSubmissionMessage(message: SubmissionMessage) {
logger.info { "Submission Notification for ${message.accNo}" }

notifySafely(message) {
notifySafely(message, SUCCESSFUL_SUBMISSION_NOTIFICATION) {
val owner = webConsumer.getExtUser(message.extUserUrl)
if (owner.notificationsEnabled) {
val sub = webConsumer.getExtSubmission(message.extTabUrl)
Expand All @@ -48,7 +45,7 @@ class SubmissionNotificationsListener(
fun receiveSubmissionReleaseMessage(message: SubmissionMessage) {
logger.info { "Release notification for ${message.accNo}" }

notifySafely(message) {
notifySafely(message, RELEASE_NOTIFICATION) {
val owner = webConsumer.getExtUser(message.extUserUrl)
if (owner.notificationsEnabled) {
val sub = webConsumer.getExtSubmission(message.extTabUrl)
Expand All @@ -66,8 +63,18 @@ class SubmissionNotificationsListener(
}
}

private fun notifySafely(message: SubmissionMessage, notifyFunction: SubmissionMessage.() -> Unit) = runBlocking {
runCatching { notifyFunction(message) }.onFailure { onError(message) }
private fun notifySafely(
message: SubmissionMessage,
notificationType: String,
notifyFunction: SubmissionMessage.() -> Unit
) = runBlocking {
runCatching {
notifyFunction(message)
}.onFailure {
onError(message)
val errorMsg = "Error processing notification of type '$notificationType' for submission '${message.accNo}"
logger.error(it) { "$errorMsg': ${it.message ?: it.localizedMessage}" }
}
}

private suspend fun onError(message: SubmissionMessage) {
Expand All @@ -81,4 +88,12 @@ class SubmissionNotificationsListener(
else -> "${notificationProps.uiUrl}/${col.accNo.lowercase()}"
}
}

companion object {
private val logger = KotlinLogging.logger {}

private const val ERROR_MESSAGE = "Problem processing notification for submission %s"
private const val RELEASE_NOTIFICATION = "Release Notification"
private const val SUCCESSFUL_SUBMISSION_NOTIFICATION = "Successful Submission Notification"
}
}