Skip to content

Commit

Permalink
[2890] Update the new portal version before sending notifications
Browse files Browse the repository at this point in the history
Rework the PortalEventProcessor and IPortalEventHandler so that:

* handlers only compute the response (which makes them simpler);
* the PortalEventProcessor handles the response by always
  updating/saving the new portal version (if there is one) before
  emiting notifications.

This ensures that "downstream" representations like the
Representations view see the correct, updated version of the Portal
when the notification triggers their own refresh.

This also centralizes the technical details about handling reactive
streams correctly (instead of counting on each and every handler to
e.g. always publish exactly one payload in all their code paths), and
allows for "middleware" handlers like the example CountingHandler to
be written once and apply to all handlers.

Bug: #2890
Signed-off-by: Pierre-Charles David <pierre-charles.david@obeo.fr>
  • Loading branch information
pcdavid committed Feb 9, 2024
1 parent 58d4e22 commit 9e6e4ca
Show file tree
Hide file tree
Showing 15 changed files with 249 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
import org.eclipse.sirius.components.collaborative.dto.RenameRepresentationInput;
import org.eclipse.sirius.components.collaborative.portals.api.IPortalEventHandler;
import org.eclipse.sirius.components.collaborative.portals.api.IPortalInput;
import org.eclipse.sirius.components.collaborative.portals.api.PortalContext;
import org.eclipse.sirius.components.collaborative.portals.api.PortalRequest;
import org.eclipse.sirius.components.collaborative.portals.api.PortalResponse;
import org.eclipse.sirius.components.collaborative.portals.dto.PortalRefreshedEventPayload;
import org.eclipse.sirius.components.collaborative.portals.dto.RenamePortalInput;
import org.eclipse.sirius.components.collaborative.portals.handlers.CountingEventHandler;
import org.eclipse.sirius.components.collaborative.portals.services.ICollaborativePortalMessageService;
import org.eclipse.sirius.components.collaborative.portals.services.PortalServices;
import org.eclipse.sirius.components.core.api.ErrorPayload;
import org.eclipse.sirius.components.core.api.IEditingContext;
import org.eclipse.sirius.components.core.api.IInput;
import org.eclipse.sirius.components.core.api.IPayload;
Expand All @@ -38,6 +42,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micrometer.core.instrument.MeterRegistry;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
Expand Down Expand Up @@ -68,14 +73,20 @@ public class PortalEventProcessor implements IPortalEventProcessor {

private final Many<IPayload> sink = Sinks.many().multicast().directBestEffort();

private final ICollaborativePortalMessageService messageService;

private final MeterRegistry meterRegistry;

public PortalEventProcessor(IEditingContext editingContext, IRepresentationSearchService representationSearchService, IRepresentationPersistenceService representationPersistenceService,
List<IPortalEventHandler> portalEventHandlers, ISubscriptionManager subscriptionManager, Portal portal) {
ISubscriptionManager subscriptionManager, MeterRegistry meterRegistry, PortalEventProcessorConfiguration configuration) {
this.editingContext = Objects.requireNonNull(editingContext);
this.representationSearchService = Objects.requireNonNull(representationSearchService);
this.representationPersistenceService = Objects.requireNonNull(representationPersistenceService);
this.portalEventHandlers = Objects.requireNonNull(portalEventHandlers);
this.subscriptionManager = Objects.requireNonNull(subscriptionManager);
this.currentPortal = Objects.requireNonNull(portal);
this.meterRegistry = Objects.requireNonNull(meterRegistry);
this.portalEventHandlers = Objects.requireNonNull(configuration.portalEventHandlers());
this.currentPortal = Objects.requireNonNull(configuration.portal());
this.messageService = Objects.requireNonNull(configuration.messageService());
}

@Override
Expand All @@ -92,19 +103,30 @@ public ISubscriptionManager getSubscriptionManager() {
public void handle(One<IPayload> payloadSink, Many<ChangeDescription> changeDescriptionSink, IRepresentationInput representationInput) {
IRepresentationInput effectiveInput = representationInput;
if (representationInput instanceof RenameRepresentationInput renameRepresentationInput) {
effectiveInput = new RenamePortalInput(renameRepresentationInput.id(),
renameRepresentationInput.editingContextId(),
renameRepresentationInput.representationId(),
renameRepresentationInput.newLabel());
effectiveInput = new RenamePortalInput(renameRepresentationInput.id(), renameRepresentationInput.editingContextId(), renameRepresentationInput.representationId(),
renameRepresentationInput.newLabel());
}

if (effectiveInput instanceof IPortalInput portalInput) {
Optional<IPortalEventHandler> optionalPortalEventHandler = this.portalEventHandlers.stream().filter(handler -> handler.canHandle(portalInput)).findFirst();
PortalRequest request = new PortalRequest(this.representationSearchService, this.editingContext, this.currentPortal, portalInput);
Optional<IPortalEventHandler> optionalPortalEventHandler = this.portalEventHandlers.stream().filter(handler -> handler.canHandle(request)).findFirst();
if (optionalPortalEventHandler.isPresent()) {
IPortalEventHandler portalEventHandler = optionalPortalEventHandler.get();
PortalContext context = new PortalContext(this.representationSearchService, this.editingContext, this.currentPortal, portalInput);
portalEventHandler.handle(payloadSink, changeDescriptionSink, context);
context.getNextPortal().ifPresent(newPortal -> this.updatePortal(portalInput, newPortal));
PortalResponse response = new PortalResponse();
try {
new CountingEventHandler(portalEventHandler, this.meterRegistry).handle(request, response);
} finally {
response.getNextPortal().ifPresent(newPortal -> this.updatePortal(portalInput, newPortal));

if (response.getPayload() != null) {
payloadSink.tryEmitValue(response.getPayload());
} else {
var errorMessage = this.messageService.unhandledInput(portalInput.getClass().getSimpleName(), portalEventHandler.getClass().getSimpleName());
payloadSink.tryEmitValue(new ErrorPayload(portalInput.id(), errorMessage));
}

response.getChangeDescriptions().forEach(changeDescriptionSink::tryEmitNext);
}
} else {
this.logger.warn("No handler found for event: {}", portalInput);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*******************************************************************************
* Copyright (c) 2024 Obeo.
* This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Obeo - initial API and implementation
*******************************************************************************/
package org.eclipse.sirius.components.collaborative.portals;

import java.util.List;

import org.eclipse.sirius.components.collaborative.portals.api.IPortalEventHandler;
import org.eclipse.sirius.components.collaborative.portals.services.ICollaborativePortalMessageService;
import org.eclipse.sirius.components.portals.Portal;

/**
* Bundles Portal-specific input arguments for {@link PortalEventProcessor}.
*
* @author pcdavid
*/
public record PortalEventProcessorConfiguration(List<IPortalEventHandler> portalEventHandlers, Portal portal, ICollaborativePortalMessageService messageService) {

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.eclipse.sirius.components.collaborative.api.ISubscriptionManagerFactory;
import org.eclipse.sirius.components.collaborative.portals.api.IPortalEventHandler;
import org.eclipse.sirius.components.collaborative.portals.api.PortalConfiguration;
import org.eclipse.sirius.components.collaborative.portals.services.ICollaborativePortalMessageService;
import org.eclipse.sirius.components.core.api.IEditingContext;
import org.eclipse.sirius.components.portals.Portal;
import org.springframework.stereotype.Service;

import io.micrometer.core.instrument.MeterRegistry;

/**
* Used to create the portal event processors.
*
Expand All @@ -43,12 +46,18 @@ public class PortalEventProcessorFactory implements IRepresentationEventProcesso

private final List<IPortalEventHandler> portalEventHandlers;

private final ICollaborativePortalMessageService messageService;

private final MeterRegistry meterRegistry;

public PortalEventProcessorFactory(IRepresentationSearchService representationSearchService, IRepresentationPersistenceService representationPersistenceService,
ISubscriptionManagerFactory subscriptionManagerFactory, List<IPortalEventHandler> portalEventHandlers) {
ISubscriptionManagerFactory subscriptionManagerFactory, List<IPortalEventHandler> portalEventHandlers, ICollaborativePortalMessageService messageService, MeterRegistry meterRegistry) {
this.representationSearchService = Objects.requireNonNull(representationSearchService);
this.representationPersistenceService = Objects.requireNonNull(representationPersistenceService);
this.subscriptionManagerFactory = Objects.requireNonNull(subscriptionManagerFactory);
this.portalEventHandlers = Objects.requireNonNull(portalEventHandlers);
this.messageService = Objects.requireNonNull(messageService);
this.meterRegistry = Objects.requireNonNull(meterRegistry);
}

@Override
Expand All @@ -62,11 +71,12 @@ public <T extends IRepresentationEventProcessor> Optional<T> createRepresentatio
if (IPortalEventProcessor.class.isAssignableFrom(representationEventProcessorClass) && configuration instanceof PortalConfiguration portalConfiguration) {
var optionalPortal = this.representationSearchService.findById(editingContext, portalConfiguration.getId(), Portal.class);
if (optionalPortal.isPresent()) {
Portal portal = optionalPortal.get();
var portalEventProcessor = new PortalEventProcessor(editingContext, this.representationSearchService, this.representationPersistenceService, this.portalEventHandlers, this.subscriptionManagerFactory.create(), portal);
var eventProcessorConfiguration = new PortalEventProcessorConfiguration(this.portalEventHandlers, optionalPortal.get(), this.messageService);
var portalEventProcessor = new PortalEventProcessor(editingContext, this.representationSearchService, this.representationPersistenceService,
this.subscriptionManagerFactory.create(), this.meterRegistry, eventProcessorConfiguration);
return Optional.of(portalEventProcessor)
.filter(representationEventProcessorClass::isInstance)
.map(representationEventProcessorClass::cast);
.filter(representationEventProcessorClass::isInstance)
.map(representationEventProcessorClass::cast);
}
}
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,13 @@
*******************************************************************************/
package org.eclipse.sirius.components.collaborative.portals.api;

import org.eclipse.sirius.components.collaborative.api.ChangeDescription;
import org.eclipse.sirius.components.core.api.IPayload;

import reactor.core.publisher.Sinks.Many;
import reactor.core.publisher.Sinks.One;

/**
* Interface of all the portal event handlers.
*
* @author pcdavid
*/
public interface IPortalEventHandler {
boolean canHandle(IPortalInput portalInput);

void handle(One<IPayload> payloadSink, Many<ChangeDescription> changeDescriptionSink, PortalContext context);
boolean canHandle(PortalRequest request);

void handle(PortalRequest request, PortalResponse response);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,18 @@
package org.eclipse.sirius.components.collaborative.portals.api;

import java.util.Objects;
import java.util.Optional;

import org.eclipse.sirius.components.collaborative.api.IRepresentationSearchService;
import org.eclipse.sirius.components.collaborative.portals.services.PortalServices;
import org.eclipse.sirius.components.core.api.IEditingContext;
import org.eclipse.sirius.components.portals.Portal;

/**
* Context for the execution of a portal event handler.
* Encapsulates all the input for a portal event handler.
*
* @author pcdavid
*/
public class PortalContext {
public class PortalRequest {
private final IRepresentationSearchService representationSearchService;

private final IEditingContext editingContext;
Expand All @@ -34,9 +33,7 @@ public class PortalContext {

private final IPortalInput input;

private Optional<Portal> nextPortal = Optional.empty();

public PortalContext(IRepresentationSearchService representationSearchService, IEditingContext editingContext, Portal currentPortal, IPortalInput input) {
public PortalRequest(IRepresentationSearchService representationSearchService, IEditingContext editingContext, Portal currentPortal, IPortalInput input) {
this.representationSearchService = Objects.requireNonNull(representationSearchService);
this.editingContext = Objects.requireNonNull(editingContext);
this.currentPortal = Objects.requireNonNull(currentPortal);
Expand All @@ -58,13 +55,4 @@ public Portal getCurrentPortal() {
public IPortalInput getInput() {
return this.input;
}

public Optional<Portal> getNextPortal() {
return this.nextPortal;
}

public void setNextPortal(Portal nextPortal) {
this.nextPortal = Optional.ofNullable(nextPortal);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*******************************************************************************
* Copyright (c) 2024 Obeo.
* This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
* Obeo - initial API and implementation
*******************************************************************************/
package org.eclipse.sirius.components.collaborative.portals.api;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

import org.eclipse.sirius.components.collaborative.api.ChangeDescription;
import org.eclipse.sirius.components.core.api.IPayload;
import org.eclipse.sirius.components.portals.Portal;

/**
* Encapsulates the response from a portal event handler.
*
* @author pcdavid
*/
public class PortalResponse {
private IPayload payload;

private Optional<Portal> nextPortal = Optional.empty();

private List<ChangeDescription> changeDescriptions = new ArrayList<>();

public Optional<Portal> getNextPortal() {
return this.nextPortal;
}

public void setNextPortal(Portal nextPortal) {
this.nextPortal = Optional.ofNullable(nextPortal);
}

public void addChangeDescription(ChangeDescription changeDescription) {
this.changeDescriptions.add(Objects.requireNonNull(changeDescription));
}

public List<ChangeDescription> getChangeDescriptions() {
return this.changeDescriptions;
}

public void setPayload(IPayload payload) {
this.payload = Objects.requireNonNull(payload);
}

public IPayload getPayload() {
return this.payload;
}
}
Loading

0 comments on commit 9e6e4ca

Please sign in to comment.