Skip to content

Commit

Permalink
Don't move message if not processed
Browse files Browse the repository at this point in the history
Addresses RISDEV-2573
  • Loading branch information
zechmeister committed Oct 12, 2023
1 parent 52d75c4 commit d0f56e5
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

@Component
public class JurisXmlExporterResponseProcessor {
Expand Down Expand Up @@ -74,25 +73,18 @@ private void processInbox(Store store) {
.flatMap(Optional::stream)
.sorted(
Comparator.comparing(wrapper -> wrapper instanceof ImportMessageWrapper ? 0 : 1))
.map(this::processMessage)
.map(this::forwardMessage)
.flatMap(Optional::stream)
.map(this::setPublicationStatus)
.map(this::saveAttachments)
.toList();
moveMessages(processedMessages, inbox, store.getFolder("processed"));
} catch (MessagingException e) {
throw new StatusImporterException("Error processing inbox: " + e);
}
}

private MessageWrapper processMessage(MessageWrapper messageWrapper) {
return Mono.just(messageWrapper)
.flatMap(this::forwardMessage)
.flatMap(this::setPublicationStatus)
.flatMap(this::saveAttachments)
.doOnSuccess(result -> LOGGER.info("Message processed for: {}", messageWrapper))
.doOnError(e -> LOGGER.error("Error processing message: ", e))
.block();
}

private Mono<MessageWrapper> saveAttachments(MessageWrapper messageWrapper) {
private MessageWrapper saveAttachments(MessageWrapper messageWrapper) {
PolicyFactory policy =
new HtmlPolicyBuilder()
.allowElements(
Expand Down Expand Up @@ -127,9 +119,10 @@ private Mono<MessageWrapper> saveAttachments(MessageWrapper messageWrapper) {
.build())
.toList())
.collectList()
.thenReturn(messageWrapper);
.thenReturn(messageWrapper)
.block();
} catch (MessagingException | IOException e) {
return Mono.error(new StatusImporterException("Error saving attachments"));
throw new StatusImporterException("Error saving attachments");
}
}

Expand All @@ -153,7 +146,7 @@ private List<Attachment> collectAttachments(MessageWrapper messageWrapper)
.toList();
}

private Mono<MessageWrapper> setPublicationStatus(MessageWrapper messageWrapper) {
private MessageWrapper setPublicationStatus(MessageWrapper messageWrapper) {
try {
return statusService
.update(
Expand All @@ -162,46 +155,40 @@ private Mono<MessageWrapper> setPublicationStatus(MessageWrapper messageWrapper)
.publicationStatus(getPublicationStatus(messageWrapper.isPublished()))
.withError(messageWrapper.hasErrors())
.build())
.thenReturn(messageWrapper);
.thenReturn(messageWrapper)
.block();
} catch (MessagingException | IOException e) {
return Mono.error(new StatusImporterException("Could not update publicationStatus" + e));
throw new StatusImporterException("Could not update publicationStatus" + e);
} catch (NullPointerException ex) {
LOGGER.error("NPE with messageWrapper: {}", messageWrapper, ex);
return Mono.error(new StatusImporterException("Could not update publicationStatus" + ex));
throw new StatusImporterException("Could not update publicationStatus" + ex);
}
}

private Mono<MessageWrapper> forwardMessage(MessageWrapper messageWrapper) {
private Optional<MessageWrapper> forwardMessage(MessageWrapper messageWrapper) {
try {
String documentNumber = messageWrapper.getDocumentNumber();
String subject = messageWrapper.getSubject();
List<Attachment> attachments = collectAttachments(messageWrapper);

return statusService
.getLatestIssuerAddress(documentNumber)
.flatMap(
issuerAddress ->
Mono.fromRunnable(
() ->
mailSender.sendMail(
storeFactory.getUsername(),
issuerAddress,
"FWD: " + subject,
"Anbei weitergeleitet von der jDV:",
attachments,
"report-" + documentNumber)))
.switchIfEmpty(
Mono.fromRunnable(
() ->
LOGGER.info(
"Could not forward JurisResponse (DocumentUnit not found): {}",
documentNumber)))
.thenReturn(messageWrapper);
return Optional.ofNullable(statusService.getLatestIssuerAddress(documentNumber).block())
.map(
issuerAddress -> {
mailSender.sendMail(
storeFactory.getUsername(),
issuerAddress,
"FWD: " + subject,
"Anbei weitergeleitet von der jDV:",
attachments,
"report-" + documentNumber);
return messageWrapper;
});

} catch (MessagingException | IOException e) {
return Mono.error(new StatusImporterException("Could not forward Message"));
throw new StatusImporterException("Could not forward Message");
} catch (NullPointerException ex) {
LOGGER.error("NPE with messageWrapper: {}", messageWrapper, ex);
return Mono.error(new StatusImporterException("Could not forward Message"));
throw new StatusImporterException("Could not forward Message");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,19 @@ void testMessageGetsNotMovedIfNotForwarded() throws MessagingException {
});

verifyNoInteractions(mailSender);
verify(inbox, never()).getFolder("processed");
verify(inbox, never()).copyMessages(any(), any());
verify(importMessage, never()).setFlag(Flag.DELETED, true);
}

@Test
void testMessageGetsNotMovedIfDocumentNumberNotFound() throws MessagingException {
when(inbox.getMessages()).thenReturn(new Message[] {importMessage});
when(statusService.getLatestIssuerAddress(DOCUMENT_NUMBER)).thenReturn(Mono.empty());

responseProcessor.readEmails();

verifyNoInteractions(mailSender);
verify(inbox, never()).copyMessages(new Message[] {importMessage}, processed);
verify(importMessage, never()).setFlag(Flag.DELETED, true);
}

Expand Down

0 comments on commit d0f56e5

Please sign in to comment.