Skip to content

Commit

Permalink
Pivotal # 187104619: Submission Notification Listener WebClient Buffe…
Browse files Browse the repository at this point in the history
…r Size (#810)

https://www.pivotaltracker.com/story/show/187104619

- Fix buffer size for the handler web client
- Improve error logging for failed notifications
  • Loading branch information
jhoanmanuelms authored Feb 27, 2024
1 parent f1b6fcb commit 489c9ee
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package ac.uk.ebi.biostd.handlers

import org.springframework.amqp.rabbit.annotation.EnableRabbit
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.amqp.support.converter.MessageConverter
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.context.annotation.Bean
import org.springframework.web.reactive.function.client.WebClient

@Suppress("SpreadOperator")
fun main(args: Array<String>) {
Expand All @@ -15,10 +11,4 @@ fun main(args: Array<String>) {

@SpringBootApplication
@EnableRabbit
class HandlersApplication {
@Bean
fun jsonMessageConverter(): MessageConverter = Jackson2JsonMessageConverter()

@Bean
fun webClient(): WebClient = WebClient.builder().build()
}
class HandlersApplication
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.amqp.support.converter.MessageConverter
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.core.io.ResourceLoader
import org.springframework.http.codec.ClientCodecConfigurer
import org.springframework.web.reactive.function.client.ExchangeStrategies
import org.springframework.web.reactive.function.client.WebClient
import uk.ac.ebi.extended.serialization.integration.ExtSerializationConfig
import uk.ac.ebi.extended.serialization.service.ExtSerializationService
Expand Down Expand Up @@ -78,7 +81,22 @@ class Listeners {
}

@Configuration
class Services {
class WebConfig {
@Bean
fun jsonMessageConverter(): MessageConverter = Jackson2JsonMessageConverter()

@Bean
fun webClient(): WebClient {
val exchangeStrategies =
ExchangeStrategies.builder().codecs { configurer ->
if (configurer is ClientCodecConfigurer) configurer.defaultCodecs().maxInMemorySize(-1)
}.build()

return WebClient.builder()
.exchangeStrategies(exchangeStrategies)
.build()
}

@Bean
fun bioStudiesWebConsumer(
client: WebClient,
Expand All @@ -90,7 +108,10 @@ class Services {
client: WebClient,
applicationProperties: ApplicationProperties
): NotificationsSender = NotificationsSender(client, applicationProperties.notifications.slackUrl)
}

@Configuration
class Services {
@Bean
fun extSerializationService(): ExtSerializationService = ExtSerializationConfig.extSerializationService()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import mu.KotlinLogging
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.amqp.rabbit.core.RabbitTemplate

private val logger = KotlinLogging.logger {}
private const val ERROR_MESSAGE = "Problem processing notification for submission %s"

class SubmissionNotificationsListener(
private val rabbitTemplate: RabbitTemplate,
private val webConsumer: BioStudiesWebConsumer,
Expand All @@ -34,7 +31,7 @@ class SubmissionNotificationsListener(
fun receiveSubmissionMessage(message: SubmissionMessage) {
logger.info { "Submission Notification for ${message.accNo}" }

notifySafely(message) {
notifySafely(message, SUCCESSFUL_SUBMISSION_NOTIFICATION) {
val owner = webConsumer.getExtUser(message.extUserUrl)
if (owner.notificationsEnabled) {
val sub = webConsumer.getExtSubmission(message.extTabUrl)
Expand All @@ -48,7 +45,7 @@ class SubmissionNotificationsListener(
fun receiveSubmissionReleaseMessage(message: SubmissionMessage) {
logger.info { "Release notification for ${message.accNo}" }

notifySafely(message) {
notifySafely(message, RELEASE_NOTIFICATION) {
val owner = webConsumer.getExtUser(message.extUserUrl)
if (owner.notificationsEnabled) {
val sub = webConsumer.getExtSubmission(message.extTabUrl)
Expand All @@ -66,8 +63,18 @@ class SubmissionNotificationsListener(
}
}

private fun notifySafely(message: SubmissionMessage, notifyFunction: SubmissionMessage.() -> Unit) = runBlocking {
runCatching { notifyFunction(message) }.onFailure { onError(message) }
private fun notifySafely(
message: SubmissionMessage,
notificationType: String,
notifyFunction: SubmissionMessage.() -> Unit
) = runBlocking {
runCatching {
notifyFunction(message)
}.onFailure {
onError(message)
val errorMsg = "Error processing notification of type '$notificationType' for submission '${message.accNo}"
logger.error(it) { "$errorMsg': ${it.message ?: it.localizedMessage}" }
}
}

private suspend fun onError(message: SubmissionMessage) {
Expand All @@ -81,4 +88,12 @@ class SubmissionNotificationsListener(
else -> "${notificationProps.uiUrl}/${col.accNo.lowercase()}"
}
}

companion object {
private val logger = KotlinLogging.logger {}

private const val ERROR_MESSAGE = "Problem processing notification for submission %s"
private const val RELEASE_NOTIFICATION = "Release Notification"
private const val SUCCESSFUL_SUBMISSION_NOTIFICATION = "Successful Submission Notification"
}
}

0 comments on commit 489c9ee

Please sign in to comment.