Skip to content

Commit

Permalink
bug handling batch commands
Browse files Browse the repository at this point in the history
  • Loading branch information
larousso committed Jul 18, 2024
1 parent 9a6ecd9 commit 8a2e832
Showing 1 changed file with 59 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import io.vavr.Tuple0;
import io.vavr.Tuple3;
import io.vavr.Value;
import io.vavr.collection.HashMap;
import io.vavr.collection.List;
import io.vavr.collection.Map;
import io.vavr.collection.Seq;
Expand All @@ -18,17 +17,15 @@
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;

import static io.vavr.API.List;
import static io.vavr.API.None;
import static io.vavr.API.Tuple;
import static fr.maif.concurrent.CompletionStages.traverse;
import static io.vavr.API.*;
import static java.util.function.Function.identity;

public class EventProcessorImpl<Error, S extends State<S>, C extends Command<Meta, Context>, E extends Event, TxCtx, Message, Meta, Context> implements EventProcessor<Error, S, C, E, TxCtx, Message, Meta, Context> {
Expand Down Expand Up @@ -64,15 +61,30 @@ public CompletionStage<List<Either<Error, ProcessingSuccess<S, E, Meta, Context,
LOGGER.debug("Processing commands {}", commands);
return transactionManager
.withTransaction(ctx -> batchProcessCommand(ctx, commands))
.thenCompose(listInTransactionResult ->
listInTransactionResult.postTransaction()
);
.thenCompose(InTransactionResult::postTransaction);
}

record PreparedMessage<C extends Command<Meta, Context>, S extends State<S>, E extends Event, M, Meta, Context>(CommandAndInfos<C, S, E, M, Meta, Context> command, E event, Long seq, Integer num, Integer total, String transactionId) {}

record CommandAndInfos<C extends Command<Meta, Context>, S extends State<S>, E extends Event, M, Meta, Context>(int tmpCommandId, C command, Option<S> mayBeState, Events<E, M> events) {
@Override
public boolean equals(Object obj) {
if (obj instanceof CommandAndInfos<?, ?, ?, ?, ?, ?> commandAndInfos) {
return tmpCommandId == commandAndInfos.tmpCommandId;
}
return false;
}

@Override
public int hashCode() {
return Objects.hashCode(tmpCommandId);
}
}

@Override
public CompletionStage<InTransactionResult<List<Either<Error, ProcessingSuccess<S, E, Meta, Context, Message>>>>> batchProcessCommand(TxCtx ctx, List<C> commands) {
// Collect all states from db
return aggregateStore.getAggregates(ctx, commands.filter(c -> c.hasId()).map(c -> c.entityId().get()))
return aggregateStore.getAggregates(ctx, commands.filter(Command::hasId).map(c -> c.entityId().get()))
.thenCompose(states ->
traverseCommands(commands, (c, events) -> {
//handle command with state to get events
Expand All @@ -87,28 +99,48 @@ public CompletionStage<InTransactionResult<List<Either<Error, ProcessingSuccess<
.map(Tuple3::_3)
.flatMap(Either::swap)
.map(Either::left);
AtomicInteger counter = new AtomicInteger(0);
// Collect all successes : command + List<E>
List<CommandAndInfos<C, S, E, Message, Meta, Context>> successes = commandsAndResults.flatMap(t -> t._3.map(events ->
// Here we generate a tmp command id because we could have 2 commands for the same entity in the same batch
new CommandAndInfos<>(counter.getAndIncrement(), t._1, t._2, events))
);

// Flatten by events : command + E (a join between command and events)
List<PreparedMessage<C, S, E, Message, Meta, Context>> preparedMessages = successes.flatMap(commandAndInfos -> {
String transactionId = transactionManager.transactionId();
Integer totalMessages = commandAndInfos.events.events.size();
return commandAndInfos.events.events.zipWithIndex().map(evt ->
new PreparedMessage<>(commandAndInfos, evt._1, 0L, evt._2, totalMessages, transactionId)
);
});

Map<String, C> commandsById = HashMap.ofEntries(commandsAndResults.flatMap(t -> t._3.toList().flatMap(any -> any.events.map(e -> e.entityId()).distinct().map(id -> Tuple.of(id, t._1)))));
Map<String, Message> messageById = HashMap.ofEntries(commandsAndResults.flatMap(t -> t._3.map(any -> Tuple(t._1.entityId().get(), any.message))));
Map<String, Option<S>> statesById = HashMap.ofEntries(commandsAndResults.flatMap(t -> t._3.map(any -> Tuple(t._1.entityId().get(), t._2))));
List<E> allEvents = commandsAndResults.flatMap(t -> t._3.map(ev -> ev.events)).flatMap(identity());
Map<String, List<E>> eventsById = allEvents.groupBy(Event::entityId);
List<E> allEvents = successes.flatMap(t -> t.events.events);

CompletionStage<List<CommandStateAndEvent>> success = eventStore.nextSequences(ctx, allEvents.size())
CompletionStage<List<CommandStateAndEvent>> success =
// Generate sequences from DB
eventStore.nextSequences(ctx, allEvents.size())
.thenApply(sequences ->
buildEnvelopes(ctx, commandsById, sequences, allEvents)
// apply a sequence to an event
preparedMessages.zip(sequences).map(t -> {
Long seq = t._2;
PreparedMessage<C, S, E, Message, Meta, Context> message = t._1;
// Build the envelope that will be stored in DB later
EventEnvelope<E, Meta, Context> eventEnvelope = buildEnvelope(ctx, message.command().command(), message.event(), seq, message.num(), message.total(), message.transactionId());
return Tuple(message.command, eventEnvelope);
})
)
.thenApply(allEnvelopes -> {
Map<String, List<EventEnvelope<E, Meta, Context>>> indexed = allEnvelopes.groupBy(env -> commandsById.get(env.entityId).get().entityId().get());
return indexed.map(t -> {
String entityId = t._1;
// group envelope by original command
Map<CommandAndInfos<C, S, E, Message, Meta, Context>, List<EventEnvelope<E, Meta, Context>>> indexedByCommandId = allEnvelopes
.groupBy(env -> env._1)
.mapValues(l -> l.map(t -> t._2));
// for each original command, we prepare the result that we be returned
return indexedByCommandId.map(t -> {
CommandAndInfos<C, S, E, Message, Meta, Context> commandAndInfos = t._1;
List<EventEnvelope<E, Meta, Context>> eventEnvelopes = t._2;
C command = commandsById.get(entityId).get();
Option<S> mayBeState = statesById.get(entityId).get();
List<E> events = eventsById.getOrElse(entityId, List.empty());
Option<Long> mayBeLastSeqNum = eventEnvelopes.lastOption().map(evl -> evl.sequenceNum);
Message message = messageById.get(entityId).get();
return new CommandStateAndEvent(command, mayBeState, eventEnvelopes, events, message, mayBeLastSeqNum);
var mayBeLastSeqNum = eventEnvelopes.map(e -> e.sequenceNum).max();
return new CommandStateAndEvent(commandAndInfos.command, commandAndInfos.mayBeState, eventEnvelopes, commandAndInfos.events.events.toList(), commandAndInfos.events.message, mayBeLastSeqNum);
}).toList();

});
Expand Down Expand Up @@ -165,11 +197,10 @@ public CompletionStage<InTransactionResult<List<Either<Error, ProcessingSuccess<
.thenApply(__ -> Tuple.empty())
.exceptionally(e -> Tuple.empty());
};
var inTransactionResult = new InTransactionResult<>(
return new InTransactionResult<>(
results,
postTransactionProcess
);
return inTransactionResult;
})
);
}
Expand All @@ -187,14 +218,6 @@ public CompletionStage<List<Tuple3<C, Option<S>, Either<Error, Events<E, Message
).thenApply(t -> t._1);
}

List<EventEnvelope<E, Meta, Context>> buildEnvelopes(TxCtx tx, Map<String, C> commands, List<Long> sequences, List<E> events) {
String transactionId = transactionManager.transactionId();
int nbMessages = events.length();
return events.zip(sequences).zipWithIndex().map(t ->
buildEnvelope(tx, commands.get(t._1._1.entityId()).get(), t._1._1, t._1._2, t._2, nbMessages, transactionId)
);
}

private CompletionStage<Either<Error, Events<E, Message>>> handleCommand(TxCtx txCtx, Option<S> state, C command) {
return commandHandler.handleCommand(txCtx, state, command);
}
Expand Down

0 comments on commit 8a2e832

Please sign in to comment.