diff --git a/backend/src/main/java/de/bund/digitalservice/ris/caselaw/adapter/JurisXmlExporterResponseProcessor.java b/backend/src/main/java/de/bund/digitalservice/ris/caselaw/adapter/JurisXmlExporterResponseProcessor.java index 61d0b04a83..3f208bba33 100644 --- a/backend/src/main/java/de/bund/digitalservice/ris/caselaw/adapter/JurisXmlExporterResponseProcessor.java +++ b/backend/src/main/java/de/bund/digitalservice/ris/caselaw/adapter/JurisXmlExporterResponseProcessor.java @@ -28,7 +28,6 @@ import org.slf4j.LoggerFactory; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import reactor.core.publisher.Mono; @Component public class JurisXmlExporterResponseProcessor { @@ -74,7 +73,10 @@ private void processInbox(Store store) { .flatMap(Optional::stream) .sorted( Comparator.comparing(wrapper -> wrapper instanceof ImportMessageWrapper ? 0 : 1)) - .map(this::processMessage) + .map(this::forwardMessage) + .flatMap(Optional::stream) + .map(this::setPublicationStatus) + .map(this::saveAttachments) .toList(); moveMessages(processedMessages, inbox, store.getFolder("processed")); } catch (MessagingException e) { @@ -82,17 +84,7 @@ private void processInbox(Store store) { } } - private MessageWrapper processMessage(MessageWrapper messageWrapper) { - return Mono.just(messageWrapper) - .flatMap(this::forwardMessage) - .flatMap(this::setPublicationStatus) - .flatMap(this::saveAttachments) - .doOnSuccess(result -> LOGGER.info("Message processed for: {}", messageWrapper)) - .doOnError(e -> LOGGER.error("Error processing message: ", e)) - .block(); - } - - private Mono saveAttachments(MessageWrapper messageWrapper) { + private MessageWrapper saveAttachments(MessageWrapper messageWrapper) { PolicyFactory policy = new HtmlPolicyBuilder() .allowElements( @@ -127,9 +119,10 @@ private Mono saveAttachments(MessageWrapper messageWrapper) { .build()) .toList()) .collectList() - .thenReturn(messageWrapper); + .thenReturn(messageWrapper) + .block(); } catch (MessagingException | IOException e) { - return Mono.error(new StatusImporterException("Error saving attachments")); + throw new StatusImporterException("Error saving attachments"); } } @@ -153,7 +146,7 @@ private List collectAttachments(MessageWrapper messageWrapper) .toList(); } - private Mono setPublicationStatus(MessageWrapper messageWrapper) { + private MessageWrapper setPublicationStatus(MessageWrapper messageWrapper) { try { return statusService .update( @@ -162,46 +155,40 @@ private Mono setPublicationStatus(MessageWrapper messageWrapper) .publicationStatus(getPublicationStatus(messageWrapper.isPublished())) .withError(messageWrapper.hasErrors()) .build()) - .thenReturn(messageWrapper); + .thenReturn(messageWrapper) + .block(); } catch (MessagingException | IOException e) { - return Mono.error(new StatusImporterException("Could not update publicationStatus" + e)); + throw new StatusImporterException("Could not update publicationStatus" + e); } catch (NullPointerException ex) { LOGGER.error("NPE with messageWrapper: {}", messageWrapper, ex); - return Mono.error(new StatusImporterException("Could not update publicationStatus" + ex)); + throw new StatusImporterException("Could not update publicationStatus" + ex); } } - private Mono forwardMessage(MessageWrapper messageWrapper) { + private Optional forwardMessage(MessageWrapper messageWrapper) { try { String documentNumber = messageWrapper.getDocumentNumber(); String subject = messageWrapper.getSubject(); List attachments = collectAttachments(messageWrapper); - return statusService - .getLatestIssuerAddress(documentNumber) - .flatMap( - issuerAddress -> - Mono.fromRunnable( - () -> - mailSender.sendMail( - storeFactory.getUsername(), - issuerAddress, - "FWD: " + subject, - "Anbei weitergeleitet von der jDV:", - attachments, - "report-" + documentNumber))) - .switchIfEmpty( - Mono.fromRunnable( - () -> - LOGGER.info( - "Could not forward JurisResponse (DocumentUnit not found): {}", - documentNumber))) - .thenReturn(messageWrapper); + return Optional.ofNullable(statusService.getLatestIssuerAddress(documentNumber).block()) + .map( + issuerAddress -> { + mailSender.sendMail( + storeFactory.getUsername(), + issuerAddress, + "FWD: " + subject, + "Anbei weitergeleitet von der jDV:", + attachments, + "report-" + documentNumber); + return messageWrapper; + }); + } catch (MessagingException | IOException e) { - return Mono.error(new StatusImporterException("Could not forward Message")); + throw new StatusImporterException("Could not forward Message"); } catch (NullPointerException ex) { LOGGER.error("NPE with messageWrapper: {}", messageWrapper, ex); - return Mono.error(new StatusImporterException("Could not forward Message")); + throw new StatusImporterException("Could not forward Message"); } } diff --git a/backend/src/test/java/de/bund/digitalservice/ris/caselaw/adapter/JurisXmlExporterResponseProcessorTest.java b/backend/src/test/java/de/bund/digitalservice/ris/caselaw/adapter/JurisXmlExporterResponseProcessorTest.java index 8df6f78a05..4bc1dd6b77 100644 --- a/backend/src/test/java/de/bund/digitalservice/ris/caselaw/adapter/JurisXmlExporterResponseProcessorTest.java +++ b/backend/src/test/java/de/bund/digitalservice/ris/caselaw/adapter/JurisXmlExporterResponseProcessorTest.java @@ -124,7 +124,19 @@ void testMessageGetsNotMovedIfNotForwarded() throws MessagingException { }); verifyNoInteractions(mailSender); - verify(inbox, never()).getFolder("processed"); + verify(inbox, never()).copyMessages(any(), any()); + verify(importMessage, never()).setFlag(Flag.DELETED, true); + } + + @Test + void testMessageGetsNotMovedIfDocumentNumberNotFound() throws MessagingException { + when(inbox.getMessages()).thenReturn(new Message[] {importMessage}); + when(statusService.getLatestIssuerAddress(DOCUMENT_NUMBER)).thenReturn(Mono.empty()); + + responseProcessor.readEmails(); + + verifyNoInteractions(mailSender); + verify(inbox, never()).copyMessages(new Message[] {importMessage}, processed); verify(importMessage, never()).setFlag(Flag.DELETED, true); }