diff --git a/src/main/java/com/czertainly/core/config/AsyncConfig.java b/src/main/java/com/czertainly/core/config/AsyncConfig.java index 0ac0024c..96dc8cbb 100644 --- a/src/main/java/com/czertainly/core/config/AsyncConfig.java +++ b/src/main/java/com/czertainly/core/config/AsyncConfig.java @@ -1,5 +1,8 @@ package com.czertainly.core.config; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; @@ -12,9 +15,16 @@ @EnableAsync public class AsyncConfig implements AsyncConfigurer { + private static final Logger logger = LoggerFactory.getLogger(AsyncConfig.class); + @Override public Executor getAsyncExecutor() { Executor virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor(); return new DelegatingSecurityContextExecutor(virtualThreadExecutor); } + + @Override + public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { + return (ex, method, params) -> logger.error("Uncaught exception in async method {}.{}: {}", method.getDeclaringClass().getName(), method.getName(), ex.getMessage(), ex); + } } diff --git a/src/main/java/com/czertainly/core/event/transaction/DiscoveryFinishedEvent.java b/src/main/java/com/czertainly/core/event/transaction/DiscoveryFinishedEvent.java deleted file mode 100644 index 3956bea2..00000000 --- a/src/main/java/com/czertainly/core/event/transaction/DiscoveryFinishedEvent.java +++ /dev/null @@ -1,8 +0,0 @@ -package com.czertainly.core.event.transaction; - -import com.czertainly.core.tasks.ScheduledJobInfo; - -import java.util.UUID; - -public record DiscoveryFinishedEvent(UUID discoveryUuid, UUID loggedUserUuid, ScheduledJobInfo scheduledJobInfo) { -} diff --git a/src/main/java/com/czertainly/core/event/transaction/DiscoveryProgressEvent.java b/src/main/java/com/czertainly/core/event/transaction/DiscoveryProgressEvent.java deleted file mode 100644 index 2a5dfdf2..00000000 --- a/src/main/java/com/czertainly/core/event/transaction/DiscoveryProgressEvent.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.czertainly.core.event.transaction; - -import java.util.UUID; - -public record DiscoveryProgressEvent(UUID discoveryUuid, int totalCount, boolean downloading) { -} diff --git a/src/main/java/com/czertainly/core/messaging/listeners/EventListener.java b/src/main/java/com/czertainly/core/messaging/listeners/EventListener.java index b2816f4d..1aec533c 100644 --- a/src/main/java/com/czertainly/core/messaging/listeners/EventListener.java +++ b/src/main/java/com/czertainly/core/messaging/listeners/EventListener.java @@ -1,18 +1,11 @@ package com.czertainly.core.messaging.listeners; -import com.czertainly.api.exception.AttributeException; -import com.czertainly.api.exception.NotFoundException; -import com.czertainly.api.exception.RuleException; +import com.czertainly.api.model.core.auth.Resource; import com.czertainly.api.model.core.certificate.CertificateEvent; import com.czertainly.api.model.core.certificate.CertificateEventStatus; -import com.czertainly.api.model.core.other.ResourceEvent; import com.czertainly.core.messaging.configuration.RabbitMQConstants; import com.czertainly.core.messaging.model.EventMessage; import com.czertainly.core.service.CertificateEventHistoryService; -import com.czertainly.core.service.DiscoveryService; -import com.czertainly.core.tasks.ScheduledJobInfo; -import com.czertainly.core.util.AuthHelper; -import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; @@ -20,8 +13,6 @@ import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; import java.util.Objects; @Component @@ -31,19 +22,6 @@ public class EventListener { private static final Logger logger = LoggerFactory.getLogger(EventListener.class); private CertificateEventHistoryService certificateEventHistoryService; - private DiscoveryService discoveryService; - private AuthHelper authHelper; - private final ObjectMapper mapper = new ObjectMapper(); - - @Autowired - public void setAuthHelper(AuthHelper authHelper) { - this.authHelper = authHelper; - } - - @Autowired - public void setDiscoveryService(DiscoveryService discoveryService) { - this.discoveryService = discoveryService; - } @Autowired public void setCertificateEventHistoryService(CertificateEventHistoryService certificateEventHistoryService) { @@ -51,17 +29,11 @@ public void setCertificateEventHistoryService(CertificateEventHistoryService cer } @RabbitListener(queues = RabbitMQConstants.QUEUE_EVENTS_NAME, messageConverter = "jsonMessageConverter", concurrency = "3") - public void processMessage(EventMessage eventMessage) throws NotFoundException, CertificateException, NoSuchAlgorithmException, RuleException, AttributeException { - switch (eventMessage.getResource()) { - case CERTIFICATE -> - certificateEventHistoryService.addEventHistory(eventMessage.getResourceUUID(), CertificateEvent.findByCode(eventMessage.getEventName()), CertificateEventStatus.valueOf(eventMessage.getEventStatus()), eventMessage.getEventMessage(), eventMessage.getEventDetail()); - case DISCOVERY -> { - authHelper.authenticateAsUser(eventMessage.getUserUuid()); - if (Objects.equals(eventMessage.getEventName(), ResourceEvent.DISCOVERY_FINISHED.getCode())) { - discoveryService.evaluateDiscoveryTriggers(eventMessage.getResourceUUID(), eventMessage.getUserUuid(), mapper.convertValue(eventMessage.getEventData(), ScheduledJobInfo.class)); - } - } - default -> logger.warn("Event handling is supported only for certificates for now"); + public void processMessage(EventMessage eventMessage) { + if (Objects.requireNonNull(eventMessage.getResource()) == Resource.CERTIFICATE) { + certificateEventHistoryService.addEventHistory(eventMessage.getResourceUUID(), CertificateEvent.findByCode(eventMessage.getEventName()), CertificateEventStatus.valueOf(eventMessage.getEventStatus()), eventMessage.getEventMessage(), eventMessage.getEventDetail()); + } else { + logger.warn("Event handling is supported only for certificates for now"); } } diff --git a/src/main/java/com/czertainly/core/messaging/listeners/ValidationListener.java b/src/main/java/com/czertainly/core/messaging/listeners/ValidationListener.java index 73735813..0c65da25 100644 --- a/src/main/java/com/czertainly/core/messaging/listeners/ValidationListener.java +++ b/src/main/java/com/czertainly/core/messaging/listeners/ValidationListener.java @@ -6,12 +6,12 @@ import com.czertainly.core.messaging.configuration.RabbitMQConstants; import com.czertainly.core.messaging.model.ValidationMessage; import com.czertainly.core.service.handler.CertificateHandler; -import jakarta.transaction.Transactional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; import java.util.List; diff --git a/src/main/java/com/czertainly/core/messaging/producers/EventProducer.java b/src/main/java/com/czertainly/core/messaging/producers/EventProducer.java index 42ca68cd..ad1e361e 100644 --- a/src/main/java/com/czertainly/core/messaging/producers/EventProducer.java +++ b/src/main/java/com/czertainly/core/messaging/producers/EventProducer.java @@ -4,10 +4,8 @@ import com.czertainly.api.model.core.auth.Resource; import com.czertainly.api.model.core.certificate.CertificateEvent; import com.czertainly.api.model.core.certificate.CertificateEventStatus; -import com.czertainly.api.model.core.other.ResourceEvent; import com.czertainly.core.messaging.configuration.RabbitMQConstants; import com.czertainly.core.messaging.model.EventMessage; -import com.czertainly.core.tasks.ScheduledJobInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; @@ -44,10 +42,4 @@ public void produceCertificateStatusChangeEventMessage(final UUID certificateUUI produceMessage(eventMessage); } - public void produceDiscoveryFinishedEventMessage(final UUID discoveryUuid, final UUID userUuid, final ResourceEvent event, final ScheduledJobInfo scheduledJobInfo) { - - final EventMessage eventMessage = new EventMessage(Resource.DISCOVERY, discoveryUuid, event.getCode(), null, null, null, userUuid, scheduledJobInfo); - produceMessage(eventMessage); - } - } diff --git a/src/main/java/com/czertainly/core/service/DiscoveryService.java b/src/main/java/com/czertainly/core/service/DiscoveryService.java index 3638d2f4..095f3eb7 100644 --- a/src/main/java/com/czertainly/core/service/DiscoveryService.java +++ b/src/main/java/com/czertainly/core/service/DiscoveryService.java @@ -11,8 +11,6 @@ import com.czertainly.core.security.authz.SecurityFilter; import com.czertainly.core.tasks.ScheduledJobInfo; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; import java.util.List; import java.util.UUID; @@ -51,6 +49,4 @@ public interface DiscoveryService extends ResourceExtensionService { List getSearchableFieldInformationByGroup(); - void evaluateDiscoveryTriggers(UUID discoveryUuid, UUID userUuid, ScheduledJobInfo scheduledJobInfo) throws NotFoundException, RuleException, CertificateException, NoSuchAlgorithmException, AttributeException; - -} \ No newline at end of file +} diff --git a/src/main/java/com/czertainly/core/service/handler/CertificateHandler.java b/src/main/java/com/czertainly/core/service/handler/CertificateHandler.java index 9af007e9..0762c20b 100644 --- a/src/main/java/com/czertainly/core/service/handler/CertificateHandler.java +++ b/src/main/java/com/czertainly/core/service/handler/CertificateHandler.java @@ -21,10 +21,10 @@ import com.czertainly.core.dao.entity.workflows.TriggerHistory; import com.czertainly.core.dao.repository.CertificateRepository; import com.czertainly.core.dao.repository.DiscoveryCertificateRepository; +import com.czertainly.core.dao.repository.DiscoveryRepository; import com.czertainly.core.dao.repository.workflows.TriggerAssociationRepository; import com.czertainly.core.evaluator.CertificateRuleEvaluator; import com.czertainly.core.event.transaction.CertificateValidationEvent; -import com.czertainly.core.event.transaction.DiscoveryProgressEvent; import com.czertainly.core.messaging.model.ValidationMessage; import com.czertainly.core.messaging.producers.ValidationProducer; import com.czertainly.core.service.CertificateEventHistoryService; @@ -37,7 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Propagation; @@ -57,7 +56,6 @@ public class CertificateHandler { private AttributeEngine attributeEngine; private CertificateRuleEvaluator certificateRuleEvaluator; - private ApplicationEventPublisher applicationEventPublisher; private ValidationProducer validationProducer; private TriggerService triggerService; @@ -66,6 +64,7 @@ public class CertificateHandler { private CertificateEventHistoryService certificateEventHistoryService; private CertificateRepository certificateRepository; + private DiscoveryRepository discoveryRepository; private DiscoveryCertificateRepository discoveryCertificateRepository; private TriggerAssociationRepository triggerAssociationRepository; @@ -79,11 +78,6 @@ public void setCertificateRuleEvaluator(CertificateRuleEvaluator certificateRule this.certificateRuleEvaluator = certificateRuleEvaluator; } - @Autowired - public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { - this.applicationEventPublisher = applicationEventPublisher; - } - @Autowired public void setValidationProducer(ValidationProducer validationProducer) { this.validationProducer = validationProducer; @@ -114,6 +108,11 @@ public void setCertificateRepository(CertificateRepository certificateRepository this.certificateRepository = certificateRepository; } + @Autowired + public void setDiscoveryRepository(DiscoveryRepository discoveryRepository) { + this.discoveryRepository = discoveryRepository; + } + @Autowired public void setDiscoveryCertificateRepository(DiscoveryCertificateRepository discoveryCertificateRepository) { this.discoveryCertificateRepository = discoveryCertificateRepository; @@ -128,7 +127,7 @@ public void setTriggerAssociationRepository(TriggerAssociationRepository trigger public void validate(Certificate certificate) { certificateService.validate(certificate); try { - if(certificate.getRaProfileUuid() != null) { + if (certificate.getRaProfileUuid() != null) { complianceService.checkComplianceOfCertificate(certificate); } } catch (ConnectorException e) { @@ -176,17 +175,20 @@ public void createDiscoveredCertificate(String batch, DiscoveryHistory discovery } } - applicationEventPublisher.publishEvent(new DiscoveryProgressEvent(discovery.getUuid(), discovery.getTotalCertificatesDiscovered(), true)); + // report progress + long currentCount = discoveryCertificateRepository.countByDiscovery(discovery); + discovery.setMessage(String.format("Downloaded %d %% of discovered certificates from provider (%d / %d)", (int) ((currentCount / (double) discovery.getConnectorTotalCertificatesDiscovered()) * 100), currentCount, discovery.getConnectorTotalCertificatesDiscovered())); + discoveryRepository.save(discovery); } @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.DEFAULT) public void processDiscoveredCertificate(int certIndex, int totalCount, DiscoveryHistory discovery, DiscoveryCertificate discoveryCertificate) { // Get X509 from discovered certificate and create certificate entity, do not save in database yet - Certificate entry; + Certificate certificate; X509Certificate x509Cert; try { x509Cert = CertificateUtil.parseCertificate(discoveryCertificate.getCertificateContent().getContent()); - entry = certificateService.createCertificateEntity(x509Cert); + certificate = certificateService.createCertificateEntity(x509Cert); } catch (Exception e) { logger.error("Unable to create certificate from discovery certificate with UUID {}: {}", discoveryCertificate.getUuid(), e.getMessage()); discoveryCertificate.setProcessed(true); @@ -213,60 +215,68 @@ public void processDiscoveredCertificate(int certIndex, int totalCount, Discover } try { - // First, check the triggers that have action with action type set to ignore - boolean ignored = false; - List ignoreTriggerHistories = new ArrayList<>(); - for (Trigger trigger : ignoreTriggers) { - TriggerHistory triggerHistory = triggerService.createTriggerHistory(OffsetDateTime.now(), trigger.getUuid(), discovery.getUuid(), null, discoveryCertificate.getUuid()); - if (certificateRuleEvaluator.evaluateRules(trigger.getRules(), entry, triggerHistory)) { - ignored = true; - triggerHistory.setConditionsMatched(true); - triggerHistory.setActionsPerformed(true); - break; - } else { - triggerHistory.setConditionsMatched(false); - triggerHistory.setActionsPerformed(false); - } - ignoreTriggerHistories.add(triggerHistory); - } + processTriggers(discovery.getUuid(), certificate, discoveryCertificate, ignoreTriggers, orderedTriggers); + } catch (RuleException e) { + logger.error("Unable to process trigger on certificate {} from discovery certificate with UUID {}. Message: {}", certificate.getUuid(), discoveryCertificate.getUuid(), e.getMessage()); + } - // If some trigger ignored this certificate, certificate is not saved and continue with next one - if (ignored) { - return; - } + updateDiscoveredCertificate(discovery, certificate, discoveryCertificate.getMeta()); + discoveryCertificate.setProcessed(true); + + discoveryCertificateRepository.save(discoveryCertificate); - // Save certificate to database - certificateService.updateCertificateEntity(entry); + // report progress + if (certIndex % 2 == 0) { + long currentCount = discoveryCertificateRepository.countByDiscoveryAndNewlyDiscoveredAndProcessed(discovery, true, true); + discovery.setMessage(String.format("Processed %d %% of newly discovered certificates (%d / %d)", (int) ((currentCount / (double) totalCount) * 100), currentCount, totalCount)); + discoveryRepository.save(discovery); + } + } - // update objectUuid of not ignored certs - for (TriggerHistory ignoreTriggerHistory : ignoreTriggerHistories) { - ignoreTriggerHistory.setObjectUuid(entry.getUuid()); + private void processTriggers(UUID discoveryUuid, Certificate certificate, DiscoveryCertificate discoveryCertificate, List ignoreTriggers, List orderedTriggers) throws RuleException { + // First, check the triggers that have action with action type set to ignore + boolean ignored = false; + List ignoreTriggerHistories = new ArrayList<>(); + for (Trigger trigger : ignoreTriggers) { + TriggerHistory triggerHistory = triggerService.createTriggerHistory(OffsetDateTime.now(), trigger.getUuid(), discoveryUuid, null, discoveryCertificate.getUuid()); + if (certificateRuleEvaluator.evaluateRules(trigger.getRules(), certificate, triggerHistory)) { + ignored = true; + triggerHistory.setConditionsMatched(true); + triggerHistory.setActionsPerformed(true); + break; + } else { + triggerHistory.setConditionsMatched(false); + triggerHistory.setActionsPerformed(false); } + ignoreTriggerHistories.add(triggerHistory); + } - // Evaluate rest of the triggers in given order - for (Trigger trigger : orderedTriggers) { - // Create trigger history entry - TriggerHistory triggerHistory = triggerService.createTriggerHistory(OffsetDateTime.now(), trigger.getUuid(), discovery.getUuid(), entry.getUuid(), discoveryCertificate.getUuid()); - // If rules are satisfied, perform defined actions - if (certificateRuleEvaluator.evaluateRules(trigger.getRules(), entry, triggerHistory)) { - triggerHistory.setConditionsMatched(true); - certificateRuleEvaluator.performActions(trigger, entry, triggerHistory); - triggerHistory.setActionsPerformed(triggerHistory.getRecords().isEmpty()); - } else { - triggerHistory.setConditionsMatched(false); - triggerHistory.setActionsPerformed(false); - } - } - } catch (RuleException e) { - logger.error("Unable to process trigger on certificate {} from discovery certificate with UUID {}. Message: {}", entry.getUuid(), discoveryCertificate.getUuid(), e.getMessage()); + // If some trigger ignored this certificate, certificate is not saved and continue with next one + if (ignored) { + return; } - updateDiscoveredCertificate(discovery, entry, discoveryCertificate.getMeta()); - discoveryCertificate.setProcessed(true); - discoveryCertificateRepository.save(discoveryCertificate); + // Save certificate to database + certificateService.updateCertificateEntity(certificate); - if (certIndex % 100 == 0) { - applicationEventPublisher.publishEvent(new DiscoveryProgressEvent(discovery.getUuid(), totalCount, false)); + // update objectUuid of not ignored certs + for (TriggerHistory ignoreTriggerHistory : ignoreTriggerHistories) { + ignoreTriggerHistory.setObjectUuid(certificate.getUuid()); + } + + // Evaluate rest of the triggers in given order + for (Trigger trigger : orderedTriggers) { + // Create trigger history entry + TriggerHistory triggerHistory = triggerService.createTriggerHistory(OffsetDateTime.now(), trigger.getUuid(), discoveryUuid, certificate.getUuid(), discoveryCertificate.getUuid()); + // If rules are satisfied, perform defined actions + if (certificateRuleEvaluator.evaluateRules(trigger.getRules(), certificate, triggerHistory)) { + triggerHistory.setConditionsMatched(true); + certificateRuleEvaluator.performActions(trigger, certificate, triggerHistory); + triggerHistory.setActionsPerformed(triggerHistory.getRecords().isEmpty()); + } else { + triggerHistory.setConditionsMatched(false); + triggerHistory.setActionsPerformed(false); + } } } diff --git a/src/main/java/com/czertainly/core/service/impl/DiscoveryServiceImpl.java b/src/main/java/com/czertainly/core/service/impl/DiscoveryServiceImpl.java index 393ebae1..5386b414 100644 --- a/src/main/java/com/czertainly/core/service/impl/DiscoveryServiceImpl.java +++ b/src/main/java/com/czertainly/core/service/impl/DiscoveryServiceImpl.java @@ -19,11 +19,9 @@ import com.czertainly.api.model.core.auth.Resource; import com.czertainly.api.model.core.connector.FunctionGroupCode; import com.czertainly.api.model.core.discovery.DiscoveryStatus; -import com.czertainly.api.model.core.other.ResourceEvent; import com.czertainly.api.model.core.search.FilterFieldSource; import com.czertainly.api.model.core.search.SearchFieldDataByGroupDto; import com.czertainly.api.model.core.search.SearchFieldDataDto; -import com.czertainly.api.model.scheduler.SchedulerJobExecutionStatus; import com.czertainly.core.attribute.engine.AttributeEngine; import com.czertainly.core.attribute.engine.records.ObjectAttributeContentInfo; import com.czertainly.core.comparator.SearchFieldDataComparator; @@ -34,13 +32,8 @@ import com.czertainly.core.dao.repository.workflows.TriggerAssociationRepository; import com.czertainly.core.enums.FilterField; import com.czertainly.core.event.transaction.CertificateValidationEvent; -import com.czertainly.core.event.transaction.DiscoveryFinishedEvent; -import com.czertainly.core.event.transaction.DiscoveryProgressEvent; -import com.czertainly.core.event.transaction.ScheduledJobFinishedEvent; import com.czertainly.core.messaging.model.NotificationRecipient; -import com.czertainly.core.messaging.producers.EventProducer; import com.czertainly.core.messaging.producers.NotificationProducer; -import com.czertainly.core.model.ScheduledTaskResult; import com.czertainly.core.model.auth.ResourceAction; import com.czertainly.core.security.authz.ExternalAuthorization; import com.czertainly.core.security.authz.SecuredUUID; @@ -66,12 +59,13 @@ import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Isolation; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.annotation.Propagation; import org.springframework.transaction.annotation.Transactional; -import org.springframework.transaction.event.TransactionPhase; -import org.springframework.transaction.event.TransactionalEventListener; +import org.springframework.transaction.support.DefaultTransactionDefinition; +import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; import java.util.*; import java.util.concurrent.*; @@ -91,8 +85,8 @@ public class DiscoveryServiceImpl implements DiscoveryService { public static final Semaphore downloadCertSemaphore = new Semaphore(10); public static final Semaphore processCertSemaphore = new Semaphore(10); - private EventProducer eventProducer; private NotificationProducer notificationProducer; + private PlatformTransactionManager transactionManager; private ApplicationEventPublisher applicationEventPublisher; private AttributeEngine attributeEngine; @@ -123,11 +117,6 @@ public void setAttributeEngine(AttributeEngine attributeEngine) { this.attributeEngine = attributeEngine; } - @Autowired - public void setEventProducer(EventProducer eventProducer) { - this.eventProducer = eventProducer; - } - @Autowired public void setDiscoveryCertificateHandler(CertificateHandler certificateHandler) { this.certificateHandler = certificateHandler; @@ -178,6 +167,11 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv this.applicationEventPublisher = applicationEventPublisher; } + @Autowired + public void setTransactionManager(PlatformTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + @Override @ExternalAuthorization(resource = Resource.DISCOVERY, action = ResourceAction.LIST) public DiscoveryResponseDto listDiscoveries(final SecurityFilter filter, final SearchRequestDto request) { @@ -342,81 +336,107 @@ public DiscoveryHistoryDetailDto createDiscovery(final DiscoveryDto request, fin @Override @Async + @Transactional(propagation = Propagation.NOT_SUPPORTED) @ExternalAuthorization(resource = Resource.DISCOVERY, action = ResourceAction.CREATE) public void runDiscoveryAsync(UUID discoveryUuid) { runDiscovery(discoveryUuid, null); } @Override + @Transactional(propagation = Propagation.NOT_SUPPORTED) @ExternalAuthorization(resource = Resource.DISCOVERY, action = ResourceAction.CREATE) public DiscoveryHistoryDetailDto runDiscovery(UUID discoveryUuid, ScheduledJobInfo scheduledJobInfo) { UUID loggedUserUuid = UUID.fromString(AuthHelper.getUserIdentification().getUuid()); // reload discovery modal with all association since it could be in separate transaction/session due to async + TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition()); DiscoveryHistory discovery = discoveryRepository.findWithTriggersByUuid(discoveryUuid); logger.info("Starting discovery: name={}, uuid={}", discovery.getName(), discovery.getUuid()); + + Connector connector; try { - Connector connector = connectorService.getConnectorEntity(SecuredUUID.fromString(discovery.getConnectorUuid().toString())); + connector = connectorService.getConnectorEntity(SecuredUUID.fromString(discovery.getConnectorUuid().toString())); + } catch (NotFoundException e) { + updateDiscoveryState(discovery, DiscoveryStatus.FAILED, DiscoveryStatus.FAILED, "Discovery does not have associated provider", 0, 0, null); + return finalizeDiscovery(discovery, loggedUserUuid, null, null); + } - // discover certificates by provider - DiscoveryProviderDto providerResponse = discoverCertificatesByProvider(discovery, connector, loggedUserUuid); - if (providerResponse == null) { - return discovery.mapToDto(); + // discover certificates by provider + DiscoveryProviderDto providerResponse; + try { + providerResponse = discoverCertificatesByProvider(discovery, connector, status); + if (status.isCompleted()) { + status = transactionManager.getTransaction(new DefaultTransactionDefinition()); } - - // download and create discovered certificates - List duplicateCertificates = downloadDiscoveredCertificates(discovery, connector, providerResponse); - - if (discoveryCertificateRepository.countByDiscovery(discovery) == 0 && providerResponse.getStatus() == DiscoveryStatus.FAILED) { - discovery.setStatus(DiscoveryStatus.FAILED); - discovery.setMessage("Discovery has failed on connector side with some certificates found, but none of them has been downloaded."); - notificationProducer.produceNotificationText(Resource.DISCOVERY, discovery.getUuid(), NotificationRecipient.buildUserNotificationRecipient(loggedUserUuid), String.format("Discovery %s has finished with status %s", discovery.getName(), discovery.getStatus()), discovery.getMessage()); - return discovery.mapToDto(); + } catch (DiscoveryException e) { + logger.error(e.getMessage()); + return finalizeDiscovery(discovery, loggedUserUuid, status, null); + } catch (Exception e) { + logger.error("Error in discovery '{}' at provider: {}", discovery.getName(), e.getMessage()); + updateDiscoveryState(discovery, DiscoveryStatus.FAILED, DiscoveryStatus.FAILED, "Error in provider: " + e.getMessage(), 0, 0, null); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); } + return finalizeDiscovery(discovery, loggedUserUuid, status, null); + } + discoveryRepository.save(discovery); + transactionManager.commit(status); - // process duplicates - for (DiscoveryProviderCertificateDataDto certificate : duplicateCertificates) { - try { - X509Certificate x509Cert = CertificateUtil.parseCertificate(certificate.getBase64Content()); - String fingerprint = CertificateUtil.getThumbprint(x509Cert.getEncoded()); - Certificate existingCertificate = certificateRepository.findByFingerprint(fingerprint).orElse(null); + // download and create discovered certificates + List duplicateCertificates = List.of(); + status = transactionManager.getTransaction(new DefaultTransactionDefinition()); + try { + duplicateCertificates = downloadDiscoveredCertificates(discovery, connector, providerResponse); + updateDiscoveryState(discovery, DiscoveryStatus.IN_PROGRESS, null, "Discovered certificates downloaded from provider", null, null, null); + } catch (DiscoveryException e) { + logger.error(e.getMessage()); + } + int downloadedCertificatesCount = (int) discoveryCertificateRepository.countByDiscovery(discovery); + discovery.setTotalCertificatesDiscovered(downloadedCertificatesCount); - if (existingCertificate == null) { - logger.warn("Could not update metadata for duplicate discovery certificate. Certificate with fingerprint {} not found.", fingerprint); - } else { - attributeEngine.updateMetadataAttributes(certificate.getMeta(), new ObjectAttributeContentInfo(discovery.getConnectorUuid(), Resource.CERTIFICATE, existingCertificate.getUuid(), Resource.DISCOVERY, discovery.getUuid(), discovery.getName())); - } - } catch (AttributeException e) { - logger.error("Could not update metadata for duplicate discovery certificate {}.", certificate.getUuid()); + // process duplicates + for (DiscoveryProviderCertificateDataDto certificate : duplicateCertificates) { + try { + X509Certificate x509Cert = CertificateUtil.parseCertificate(certificate.getBase64Content()); + String fingerprint = CertificateUtil.getThumbprint(x509Cert.getEncoded()); + Certificate existingCertificate = certificateRepository.findByFingerprint(fingerprint).orElse(null); + + if (existingCertificate == null) { + logger.warn("Could not update metadata for duplicate discovery certificate. Certificate with fingerprint {} not found.", fingerprint); + } else { + attributeEngine.updateMetadataAttributes(certificate.getMeta(), new ObjectAttributeContentInfo(discovery.getConnectorUuid(), Resource.CERTIFICATE, existingCertificate.getUuid(), Resource.DISCOVERY, discovery.getUuid(), discovery.getName())); } + } catch (AttributeException e) { + logger.error("Could not update metadata for duplicate discovery certificate {}.", certificate.getUuid()); + } catch (java.security.cert.CertificateException | NoSuchAlgorithmException e) { + logger.error("Could not parse and process duplicate discovery certificate {}: {}", certificate.getUuid(), e.getMessage()); } + } + String preProcessingMessage = discovery.getStatus() == DiscoveryStatus.IN_PROGRESS ? null : discovery.getMessage(); + long newlyDiscoveredCount = discoveryCertificateRepository.countByDiscoveryAndNewlyDiscovered(discovery, true); + if (newlyDiscoveredCount == 0) { + if (discovery.getStatus() == DiscoveryStatus.IN_PROGRESS) { + discovery.setStatus(DiscoveryStatus.COMPLETED); + } + return finalizeDiscovery(discovery, loggedUserUuid, status, preProcessingMessage); + } else { + discovery.setStatus(DiscoveryStatus.PROCESSING); + } - updateDiscovery(discovery, providerResponse, DiscoveryStatus.PROCESSING); - logger.debug("Going to process {} certificates", providerResponse.getTotalCertificatesDiscovered()); + discoveryRepository.save(discovery); + transactionManager.commit(status); - // Publish the custom event after transaction completion - applicationEventPublisher.publishEvent(new DiscoveryFinishedEvent(discovery.getUuid(), loggedUserUuid, scheduledJobInfo)); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - discovery.setStatus(DiscoveryStatus.FAILED); - discovery.setMessage(e.getMessage()); - discoveryRepository.save(discovery); - logger.error(e.getMessage()); - } catch (Exception e) { - discovery.setStatus(DiscoveryStatus.FAILED); - discovery.setMessage(e.getMessage()); - discoveryRepository.save(discovery); - logger.error(e.getMessage()); - } + logger.debug("Going to process {} certificates", newlyDiscoveredCount); - if (discovery.getStatus() != DiscoveryStatus.PROCESSING) { - notificationProducer.produceNotificationText(Resource.DISCOVERY, discovery.getUuid(), NotificationRecipient.buildUserNotificationRecipient(loggedUserUuid), String.format("Discovery %s has finished with status %s", discovery.getName(), discovery.getStatus()), discovery.getMessage()); - } + status = transactionManager.getTransaction(new DefaultTransactionDefinition()); + processDiscoveredCertificates(discovery); + discovery.setStatus(DiscoveryStatus.COMPLETED); - return discovery.mapToDto(); + applicationEventPublisher.publishEvent(new CertificateValidationEvent(null, discoveryUuid, discovery.getName(), null, null)); + return finalizeDiscovery(discovery, loggedUserUuid, status, preProcessingMessage); } - private DiscoveryProviderDto discoverCertificatesByProvider(final DiscoveryHistory discovery, final Connector connector, final UUID loggedUserUuid) throws ConnectorException, InterruptedException { + private DiscoveryProviderDto discoverCertificatesByProvider(final DiscoveryHistory discovery, final Connector connector, TransactionStatus status) throws InterruptedException, DiscoveryException, ConnectorException { DiscoveryRequestDto dtoRequest = new DiscoveryRequestDto(); dtoRequest.setName(discovery.getName()); dtoRequest.setKind(discovery.getKind()); @@ -432,8 +452,12 @@ private DiscoveryProviderDto discoverCertificatesByProvider(final DiscoveryHisto logger.debug("Discovery response: name={}, uuid={}, status={}, total={}", discovery.getName(), discovery.getUuid(), response.getStatus(), response.getTotalCertificatesDiscovered()); + if (response.getUuid() == null) { + updateDiscoveryState(discovery, DiscoveryStatus.FAILED, DiscoveryStatus.FAILED, "Discovery does not have associated discovery object at provider", 0, 0, null); + throw new DiscoveryException(discovery.getName(), discovery.getMessage()); + } + discovery.setDiscoveryConnectorReference(response.getUuid()); - discoveryRepository.save(discovery); DiscoveryDataRequestDto getRequest = new DiscoveryDataRequestDto(); getRequest.setName(response.getName()); @@ -441,57 +465,42 @@ private DiscoveryProviderDto discoverCertificatesByProvider(final DiscoveryHisto getRequest.setPageNumber(1); getRequest.setItemsPerPage(MAXIMUM_CERTIFICATES_PER_PAGE); - boolean waitForCompletion = checkForCompletion(response); boolean isReachedMaxTime = false; - int oldCertificateCount = 0; - while (waitForCompletion) { - if (discovery.getDiscoveryConnectorReference() == null) { - discovery.setStatus(DiscoveryStatus.FAILED); - discovery.setMessage("Discovery does not have associated connector"); - discoveryRepository.save(discovery); - return null; - } - logger.debug("Waiting {}ms for discovery to be completed: name={}, uuid={}", - SLEEP_TIME, discovery.getName(), discovery.getUuid()); + while (response.getStatus() == DiscoveryStatus.IN_PROGRESS) { + logger.debug("Waiting {}ms for discovery to be completed: name={}, uuid={}", SLEEP_TIME, discovery.getName(), discovery.getUuid()); Thread.sleep(SLEEP_TIME); - response = discoveryApiClient.getDiscoveryData(connector.mapToDto(), getRequest, response.getUuid()); + try { + response = discoveryApiClient.getDiscoveryData(connector.mapToDto(), getRequest, response.getUuid()); + } catch (ConnectorException e) { + updateDiscoveryState(discovery, DiscoveryStatus.FAILED, response.getStatus(), "Discovery has failed on connector side while waiting for completion.", 0, response.getTotalCertificatesDiscovered(), null); + throw new DiscoveryException(discovery.getName(), discovery.getMessage(), e); + } logger.debug("Discovery response: name={}, uuid={}, status={}, total={}", discovery.getName(), discovery.getUuid(), response.getStatus(), response.getTotalCertificatesDiscovered()); - if ((discovery.getStartTime().getTime() - new Date().getTime()) / 1000 > MAXIMUM_WAIT_TIME - && !isReachedMaxTime && oldCertificateCount == response.getTotalCertificatesDiscovered()) { + if (!isReachedMaxTime && (new Date().getTime() - discovery.getStartTime().getTime()) / 1000 > MAXIMUM_WAIT_TIME) { isReachedMaxTime = true; - discovery.setStatus(DiscoveryStatus.WARNING); - discovery.setMessage( - "Discovery " + discovery.getName() + " exceeded maximum time of " - + MAXIMUM_WAIT_TIME / (60 * 60) + " hours. There are no changes in number " + - "of certificates discovered. Please abort the discovery if the provider " + - "is stuck in state " + DiscoveryStatus.IN_PROGRESS.getLabel()); + + String message = "Discovery %s exceeded maximum time of %d hours. Please abort the discovery if the provider is stuck in state %s." + .formatted(discovery.getName(), (int) (MAXIMUM_WAIT_TIME / (60 * 60)), DiscoveryStatus.IN_PROGRESS.getLabel()); + updateDiscoveryState(discovery, DiscoveryStatus.WARNING, response.getStatus(), message, null, response.getTotalCertificatesDiscovered(), null); discoveryRepository.save(discovery); + transactionManager.commit(status); } - - oldCertificateCount = response.getTotalCertificatesDiscovered(); - waitForCompletion = checkForCompletion(response); } - discovery.setTotalCertificatesDiscovered(response.getTotalCertificatesDiscovered()); - discovery.setConnectorTotalCertificatesDiscovered(response.getTotalCertificatesDiscovered()); - discovery.setConnectorStatus(response.getStatus()); if (response.getTotalCertificatesDiscovered() == 0 && response.getStatus() == DiscoveryStatus.FAILED) { - discovery.setStatus(DiscoveryStatus.FAILED); - discovery.setMessage("Discovery has failed on connector side without any certificates found."); - notificationProducer.produceNotificationText(Resource.DISCOVERY, discovery.getUuid(), NotificationRecipient.buildUserNotificationRecipient(loggedUserUuid), String.format("Discovery %s has finished with status %s", discovery.getName(), discovery.getStatus()), discovery.getMessage()); - discoveryRepository.save(discovery); - return null; + updateDiscoveryState(discovery, DiscoveryStatus.FAILED, response.getStatus(), "Discovery has failed on connector side without any certificates found.", 0, response.getTotalCertificatesDiscovered(), response.getMeta()); + throw new DiscoveryException(discovery.getName(), discovery.getMessage()); } - discoveryRepository.save(discovery); + updateDiscoveryState(discovery, DiscoveryStatus.IN_PROGRESS, response.getStatus(), "Discovery completed at provider", null, response.getTotalCertificatesDiscovered(), response.getMeta()); return response; } - private List downloadDiscoveredCertificates(final DiscoveryHistory discovery, final Connector connector, DiscoveryProviderDto response) { + private List downloadDiscoveredCertificates(final DiscoveryHistory discovery, final Connector connector, DiscoveryProviderDto response) throws DiscoveryException { int currentPage = 1; int currentTotal = 0; @@ -504,22 +513,29 @@ private List downloadDiscoveredCertificates List> futures = new ArrayList<>(); Set uniqueCertificateContents = new HashSet<>(); List duplicateCertificates = new ArrayList<>(); + try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) { while (currentTotal < response.getTotalCertificatesDiscovered()) { getRequest.setPageNumber(currentPage); getRequest.setItemsPerPage(MAXIMUM_CERTIFICATES_PER_PAGE); - response = discoveryApiClient.getDiscoveryData(connector.mapToDto(), getRequest, response.getUuid()); + try { + response = discoveryApiClient.getDiscoveryData(connector.mapToDto(), getRequest, response.getUuid()); + } catch (ConnectorException e) { + handleDiscoveredCertificatesBatch(futures, discovery.getName()); + updateDiscoveryState(discovery, DiscoveryStatus.FAILED, response.getStatus(), "Discovery has failed on connector side while downloading certificates.", null, null, null); + throw new DiscoveryException(discovery.getName(), discovery.getMessage(), e); + } if (response.getCertificateData().isEmpty()) { - discovery.setMessage(String.format("Retrieved only %d certificates but provider discovered %d " + - "certificates in total.", currentTotal, response.getTotalCertificatesDiscovered())); - break; + handleDiscoveredCertificatesBatch(futures, discovery.getName()); + String message = String.format("Retrieved only %d certificates but provider discovered %d certificates in total.", currentTotal, response.getTotalCertificatesDiscovered()); + updateDiscoveryState(discovery, DiscoveryStatus.WARNING, response.getStatus(), message, null, null, null); + throw new DiscoveryException(discovery.getName(), discovery.getMessage()); } if (response.getCertificateData().size() > MAXIMUM_CERTIFICATES_PER_PAGE) { - updateDiscovery(discovery, response, DiscoveryStatus.FAILED); - logger.error("Too many content in response. Maximum processable is {}.", MAXIMUM_CERTIFICATES_PER_PAGE); - throw new InterruptedException( - "Too many content in response to process. Maximum processable is " + MAXIMUM_CERTIFICATES_PER_PAGE); + handleDiscoveredCertificatesBatch(futures, discovery.getName()); + updateDiscoveryState(discovery, DiscoveryStatus.FAILED, response.getStatus(), "Too many certificates (%d) in response at page %d. Maximum processable is %d.".formatted(response.getCertificateData().size(), currentPage, MAXIMUM_CERTIFICATES_PER_PAGE), null, null, null); + throw new DiscoveryException(discovery.getName(), discovery.getMessage()); } futures.add(downloadDiscoveredCertificatesBatchAsync(discovery, response, connector, uniqueCertificateContents, duplicateCertificates, executor, currentPage)); @@ -528,28 +544,15 @@ private List downloadDiscoveredCertificates currentTotal += response.getCertificateData().size(); if (futures.size() >= MAXIMUM_PARALLELISM) { - logger.debug("Waiting for {} download tasks for discovery {}", futures.size(), discovery.getName()); - for (Future future : futures) { - future.get(); - } - logger.debug("{} download tasks for discovery {} finished", futures.size(), discovery.getName()); - futures.clear(); + handleDiscoveredCertificatesBatch(futures, discovery.getName()); } } // Wait for all tasks to complete - logger.debug("Waiting for {} download tasks for discovery {}", futures.size(), discovery.getName()); - for (Future future : futures) { - future.get(); - } - logger.debug("{} download tasks for discovery {} finished", futures.size(), discovery.getName()); - } catch (Exception e) { - logger.error("An error occurred during downloading discovered certificate of discovery {}: {}", discovery.getName(), e.getMessage(), e); - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); + if (!futures.isEmpty()) { + handleDiscoveredCertificatesBatch(futures, discovery.getName()); } } - return duplicateCertificates; } @@ -590,10 +593,10 @@ private Future downloadDiscoveredCertificatesBatchAsync(final DiscoveryHistor certificateHandler.createDiscoveredCertificate(String.valueOf(currentPage), discovery, discoveredCertificates); } catch (InterruptedException e) { + logger.error("Downloading batch {} of discovered certificates for discovery {} interrupted.", currentPage, discovery.getName(), e); Thread.currentThread().interrupt(); - logger.error("Downloading batch {} of discovered certificates for discovery {} interrupted.", currentPage, discovery.getName()); } catch (Exception e) { - logger.error("Downloading batch {} of discovered certificates for discovery {} failed.", currentPage, discovery.getName()); + logger.error("Downloading batch {} of discovered certificates for discovery {} failed.", currentPage, discovery.getName(), e); } finally { logger.trace("Downloading batch {} of discovered certificates for discovery {} finalized. Released semaphore.", currentPage, discovery.getName()); downloadCertSemaphore.release(); @@ -601,25 +604,99 @@ private Future downloadDiscoveredCertificatesBatchAsync(final DiscoveryHistor }); } - private void updateDiscovery(DiscoveryHistory modal, DiscoveryProviderDto response, DiscoveryStatus status) throws AttributeException { - modal.setStatus(status); - modal.setTotalCertificatesDiscovered(status == DiscoveryStatus.FAILED ? response.getTotalCertificatesDiscovered() : (int) discoveryCertificateRepository.countByDiscovery(modal)); - attributeEngine.updateMetadataAttributes(response.getMeta(), new ObjectAttributeContentInfo(modal.getConnectorUuid(), Resource.DISCOVERY, modal.getUuid())); - discoveryRepository.save(modal); + private void handleDiscoveredCertificatesBatch(List> futures, String discoveryName) { + logger.debug("Waiting for {} download tasks for discovery {}", futures.size(), discoveryName); + for (Future future : futures) { + try { + future.get(); + } catch (Exception e) { + logger.error("An error occurred during downloading discovered certificate of discovery {}: {}", discoveryName, e.getMessage(), e); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } + } + } + logger.debug("{} download tasks for discovery {} finished", futures.size(), discoveryName); + futures.clear(); } - @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) - public void handleDiscoveryFinishedEvent(DiscoveryFinishedEvent event) { - eventProducer.produceDiscoveryFinishedEventMessage( - event.discoveryUuid(), - event.loggedUserUuid(), - ResourceEvent.DISCOVERY_FINISHED, - event.scheduledJobInfo() - ); + private void processDiscoveredCertificates(DiscoveryHistory discovery) { + // Get newly discovered certificates + List discoveredCertificates = discoveryCertificateRepository.findByDiscoveryAndNewlyDiscovered(discovery, true, Pageable.unpaged()); + + logger.debug("Number of discovered certificates to process: {}", discoveredCertificates.size()); + + if (discoveredCertificates.isEmpty()) return; + + // For each discovered certificate and for each found trigger, check if it satisfies rules defined by the trigger and perform actions accordingly + AtomicInteger index = new AtomicInteger(0); + try (ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor()) { + SecurityContext securityContext = SecurityContextHolder.getContext(); + DelegatingSecurityContextExecutor executor = new DelegatingSecurityContextExecutor(virtualThreadExecutor, securityContext); + CompletableFuture> future = discoveredCertificates.stream().collect( + ParallelCollectors.parallel( + discoveryCertificate -> { + int certIndex; + try { + certIndex = index.incrementAndGet(); + logger.trace("Waiting to process cert {} of discovered certificates for discovery {}.", certIndex, discovery.getName()); + processCertSemaphore.acquire(); + logger.trace("Processing cert {} of discovered certificates for discovery {}.", certIndex, discovery.getName()); + + certificateHandler.processDiscoveredCertificate(certIndex, discoveredCertificates.size(), discovery, discoveryCertificate); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Thread {} processing cert {} of discovered certificates interrupted.", Thread.currentThread().getName(), index.get()); + } catch (Exception e) { + logger.error("Unable to process certificate {}: {}", discoveryCertificate.getCommonName(), e.getMessage(), e); + } finally { + logger.trace("Thread {} processing cert {} of discovered certificates finalized. Released semaphore.", Thread.currentThread().getName(), index.get()); + processCertSemaphore.release(); + } + return null; // Return null to satisfy the return type + }, + executor, + MAXIMUM_PARALLELISM + ) + ); + + // Wait for all tasks to complete + future.join(); + } } - private boolean checkForCompletion(DiscoveryProviderDto response) { - return response.getStatus() == DiscoveryStatus.IN_PROGRESS; + private void updateDiscoveryState(DiscoveryHistory discovery, DiscoveryStatus status, DiscoveryStatus connectorStatus, String message, Integer totalCertificatesDiscovered, Integer connectorTotalCertificatesDiscovered, List metadata) { + discovery.setStatus(status); + if (connectorStatus != null) discovery.setConnectorStatus(connectorStatus); + if (message != null) discovery.setMessage(message); + if (totalCertificatesDiscovered != null) discovery.setTotalCertificatesDiscovered(totalCertificatesDiscovered); + if (connectorTotalCertificatesDiscovered != null) { + discovery.setConnectorTotalCertificatesDiscovered(connectorTotalCertificatesDiscovered); + } + if (metadata != null && !metadata.isEmpty()) { + try { + attributeEngine.updateMetadataAttributes(metadata, new ObjectAttributeContentInfo(discovery.getConnectorUuid(), Resource.DISCOVERY, discovery.getUuid())); + } catch (AttributeException e) { + logger.warn("Failed to serialize discovery metadata"); + } + } + } + + private DiscoveryHistoryDetailDto finalizeDiscovery(DiscoveryHistory discovery, UUID loggedUserUuid, TransactionStatus status, String preProcessingMessage) { + if (status == null || status.isCompleted()) { + status = transactionManager.getTransaction(new DefaultTransactionDefinition()); + } + + discovery.setEndTime(new Date()); + if (discovery.getStatus() == DiscoveryStatus.COMPLETED) { + discovery.setMessage(preProcessingMessage == null ? "Discovery completed successfully" : "Discovery completed. " + preProcessingMessage); + } + + discoveryRepository.save(discovery); + transactionManager.commit(status); + + notificationProducer.produceNotificationText(Resource.DISCOVERY, discovery.getUuid(), NotificationRecipient.buildUserNotificationRecipient(loggedUserUuid), String.format("Discovery %s has finished with status %s", discovery.getName(), discovery.getStatus().getLabel()), discovery.getMessage()); + return discovery.mapToDto(); } @Override @@ -656,79 +733,4 @@ public List getSearchableFieldInformationByGroup() { logger.debug("Searchable Fields by Groups: {}", searchFieldDataByGroupDtos); return searchFieldDataByGroupDtos; } - - @Override - public void evaluateDiscoveryTriggers(UUID discoveryUuid, UUID userUuid, ScheduledJobInfo scheduledJobInfo) { - // Get newly discovered certificates - DiscoveryHistory discovery = discoveryRepository.findWithTriggersByUuid(discoveryUuid); - List discoveredCertificates = discoveryCertificateRepository.findByDiscoveryAndNewlyDiscovered(discovery, true, Pageable.unpaged()); - - logger.debug("Number of discovered certificates to process: {}", discoveredCertificates.size()); - - if (!discoveredCertificates.isEmpty()) { - // For each discovered certificate and for each found trigger, check if it satisfies rules defined by the trigger and perform actions accordingly - AtomicInteger index = new AtomicInteger(0); - try (ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor()) { - SecurityContext securityContext = SecurityContextHolder.getContext(); - DelegatingSecurityContextExecutor executor = new DelegatingSecurityContextExecutor(virtualThreadExecutor, securityContext); - CompletableFuture> future = discoveredCertificates.stream().collect( - ParallelCollectors.parallel( - discoveryCertificate -> { - int certIndex; - try { - certIndex = index.incrementAndGet(); - logger.trace("Waiting to process cert {} of discovered certificates for discovery {}.", certIndex, discovery.getName()); - processCertSemaphore.acquire(); - logger.trace("Processing cert {} of discovered certificates for discovery {}.", certIndex, discovery.getName()); - - certificateHandler.processDiscoveredCertificate(certIndex, discoveredCertificates.size(), discovery, discoveryCertificate); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.error("Thread {} processing cert {} of discovered certificates interrupted.", Thread.currentThread().getName(), index.get()); - } catch (Exception e) { - logger.error("Unable to process certificate {}: {}", discoveryCertificate.getCommonName(), e.getMessage(), e); - } finally { - logger.trace("Thread {} processing cert {} of discovered certificates finalized. Released semaphore.", Thread.currentThread().getName(), index.get()); - processCertSemaphore.release(); - } - return null; // Return null to satisfy the return type - }, - executor, - MAXIMUM_PARALLELISM - ) - ); - - // Wait for all tasks to complete - future.join(); - } - } - - discovery.setStatus(DiscoveryStatus.COMPLETED); - discovery.setEndTime(new Date()); - discoveryRepository.save(discovery); - - notificationProducer.produceNotificationText(Resource.DISCOVERY, discovery.getUuid(), NotificationRecipient.buildUserNotificationRecipient(userUuid), String.format("Discovery %s has finished with status %s", discovery.getName(), discovery.getStatus()), discovery.getMessage()); - if (scheduledJobInfo != null) { - applicationEventPublisher.publishEvent(new ScheduledJobFinishedEvent(scheduledJobInfo, new ScheduledTaskResult(discovery.getStatus() == DiscoveryStatus.FAILED ? SchedulerJobExecutionStatus.FAILED : SchedulerJobExecutionStatus.SUCCESS, discovery.getMessage(), Resource.DISCOVERY, discovery.getUuid().toString()))); - } - applicationEventPublisher.publishEvent(new CertificateValidationEvent(null, discoveryUuid, discovery.getName(), null, null)); - } - - @Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.DEFAULT) - @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) - public void handleDiscoveryProgressEvent(DiscoveryProgressEvent event) { - logger.debug("Handling discovery progress event: {}", event); - DiscoveryHistory discoveryHistory = discoveryRepository.findByUuid(event.discoveryUuid()).orElse(null); - if (discoveryHistory != null) { - long currentCount; - if (event.downloading()) { - currentCount = discoveryCertificateRepository.countByDiscovery(discoveryHistory); - discoveryHistory.setMessage(String.format("Downloaded %d %% of discovered certificates from provider (%d / %d)", (int) ((currentCount / (double) event.totalCount()) * 100), currentCount, event.totalCount())); - } else { - currentCount = discoveryCertificateRepository.countByDiscoveryAndNewlyDiscoveredAndProcessed(discoveryHistory, true, true); - discoveryHistory.setMessage(String.format("Processed %d %% of newly discovered certificates (%d / %d)", (int) ((currentCount / (double) event.totalCount()) * 100), currentCount, event.totalCount())); - } - discoveryRepository.save(discoveryHistory); - } - } } diff --git a/src/main/java/com/czertainly/core/tasks/DiscoveryCertificateTask.java b/src/main/java/com/czertainly/core/tasks/DiscoveryCertificateTask.java index ffbd700b..fa9f7adf 100644 --- a/src/main/java/com/czertainly/core/tasks/DiscoveryCertificateTask.java +++ b/src/main/java/com/czertainly/core/tasks/DiscoveryCertificateTask.java @@ -32,7 +32,7 @@ public class DiscoveryCertificateTask implements ScheduledJobTask { private DiscoveryService discoveryService; - private ObjectMapper mapper = new ObjectMapper(); + private final ObjectMapper mapper = new ObjectMapper(); private PlatformTransactionManager transactionManager; @@ -87,11 +87,8 @@ public ScheduledTaskResult performJob(final ScheduledJobInfo scheduledJobInfo, f return new ScheduledTaskResult(SchedulerJobExecutionStatus.FAILED, errorMessage, discovery != null ? Resource.DISCOVERY : null, discovery != null ? discovery.getUuid() : null); } - // After the transaction with new discovery persisting commits, run discovery - status = transactionManager.getTransaction(new DefaultTransactionDefinition()); + // After the discovery is created and commited, run discovery discovery = discoveryService.runDiscovery(UUID.fromString(discovery.getUuid()), scheduledJobInfo); - transactionManager.commit(status); - if (discovery.getStatus() != DiscoveryStatus.PROCESSING) { return new ScheduledTaskResult(discovery.getStatus() == DiscoveryStatus.FAILED ? SchedulerJobExecutionStatus.FAILED : SchedulerJobExecutionStatus.SUCCESS, discovery.getMessage(), Resource.DISCOVERY, discovery.getUuid()); } diff --git a/src/test/java/com/czertainly/core/service/SchedulerServiceTest.java b/src/test/java/com/czertainly/core/service/SchedulerServiceTest.java index 6d42c287..187c8de7 100644 --- a/src/test/java/com/czertainly/core/service/SchedulerServiceTest.java +++ b/src/test/java/com/czertainly/core/service/SchedulerServiceTest.java @@ -28,7 +28,6 @@ import com.czertainly.core.security.authz.SecuredUUID; import com.czertainly.core.security.authz.SecurityFilter; import com.czertainly.core.tasks.DiscoveryCertificateTask; -import com.czertainly.core.tasks.ScheduledJobInfo; import com.czertainly.core.util.BaseSpringBootTest; import com.czertainly.core.util.MetaDefinitions; import com.github.tomakehurst.wiremock.WireMockServer; @@ -44,7 +43,7 @@ import java.util.List; import java.util.UUID; -public class SchedulerServiceTest extends BaseSpringBootTest { +class SchedulerServiceTest extends BaseSpringBootTest { @Autowired private SchedulerService schedulerService; @@ -87,7 +86,7 @@ public class SchedulerServiceTest extends BaseSpringBootTest { private Connector2FunctionGroupRepository connector2FunctionGroupRepository; @Test - public void runScheduledDiscoveryWithTriggers() throws AlreadyExistException, NotFoundException, AttributeException, SchedulerException, InterruptedException, CertificateException, NoSuchAlgorithmException, RuleException, IOException { + void runScheduledDiscoveryWithTriggers() throws AlreadyExistException, NotFoundException, AttributeException, SchedulerException, InterruptedException, CertificateException, NoSuchAlgorithmException, RuleException, IOException { // register custom attribute CustomAttribute certificateDomainAttr = new CustomAttribute(); certificateDomainAttr.setUuid(UUID.randomUUID().toString()); @@ -199,6 +198,7 @@ public void runScheduledDiscoveryWithTriggers() throws AlreadyExistException, No String discoveredCertificatesMockResponse = """ { + "uuid": "4bd64640-be29-4e14-aad8-5c0ffa55c5bd", "status": "completed", "totalCertificatesDiscovered": 2, "certificateData": [ @@ -227,34 +227,20 @@ public void runScheduledDiscoveryWithTriggers() throws AlreadyExistException, No .willReturn(WireMock.okJson(discoveredCertificatesMockResponse))); mockServer.stubFor(WireMock - .post(WireMock.urlPathMatching("/v1/discoveryProvider/discover/*")) + .post(WireMock.urlPathMatching("/v1/discoveryProvider/discover/4bd64640-be29-4e14-aad8-5c0ffa55c5bd")) .willReturn(WireMock.okJson(discoveredCertificatesMockResponse))); - TestTransaction.start(); - schedulerService.runScheduledJob(jobName); - TestTransaction.flagForCommit(); - TestTransaction.end(); - List discoveries = discoveryRepository.findAll(); - Assertions.assertEquals(1, discoveries.size()); DiscoveryHistory discovery = discoveries.getFirst(); - Assertions.assertEquals(DiscoveryStatus.PROCESSING, discovery.getStatus()); + Assertions.assertEquals(DiscoveryStatus.COMPLETED, discovery.getStatus()); ScheduledJobHistory jobHistory = scheduledJobHistoryRepository.findTopByScheduledJobUuidOrderByJobExecutionDesc(scheduledJobEntity.getUuid()); Assertions.assertNotNull(jobHistory); - TestTransaction.start(); - - // run manually processing of discovered certificates since RabbitMQ is not available - discoveryService.evaluateDiscoveryTriggers(discovery.getUuid(), UUID.randomUUID(), new ScheduledJobInfo(scheduledJobEntity.getJobName(), scheduledJobEntity.getUuid(), jobHistory.getUuid())); - - TestTransaction.flagForCommit(); - TestTransaction.end(); - ScheduledJobHistoryResponseDto jobHistoryResponse = schedulerService.getScheduledJobHistory(SecurityFilter.create(), new PaginationRequestDto(), scheduledJobEntity.getUuid().toString()); Assertions.assertEquals(1, jobHistoryResponse.getScheduledJobHistory().size());