Skip to content
This repository has been archived by the owner on Mar 13, 2021. It is now read-only.

Commit

Permalink
Propagate description of server errors, incl fn not found.
Browse files Browse the repository at this point in the history
Fixes #165
  • Loading branch information
ericbottard authored and Florent Biville committed Nov 6, 2019
1 parent eb02bb4 commit 3e73163
Showing 1 changed file with 7 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.google.protobuf.ByteString;
import com.google.protobuf.ProtocolStringList;
import io.grpc.Status;
import io.grpc.StatusException;
import io.projectriff.invoker.rpc.InputSignal;
import io.projectriff.invoker.rpc.OutputFrame;
import io.projectriff.invoker.rpc.OutputSignal;
Expand Down Expand Up @@ -51,18 +53,21 @@ public Flux<OutputSignal> invoke(Flux<InputSignal> request) {
return request
.switchOnFirst((first, stream) -> {
if (!first.hasValue() || !first.get().hasStart()) {
return Flux.error(new RuntimeException("Expected first frame to be of type Start"));
return Flux.error(Status.INVALID_ARGUMENT.withDescription("Expected first frame to be of type Start").asException());
}

String[] accept = getExpectedOutputContentTypes(first);

Function<Object, Object> userFn = functionCatalog.lookup(functionName, accept);

if (userFn == null) {
return Flux.error(Status.NOT_FOUND.withDescription("Function could not be located").asException());
}
return stream
.skip(1L)
.map(this::toSpringMessage)
.transform(invoker(userFn))
.map(this::fromSpringMessage)
.onErrorMap(e -> Status.UNKNOWN.withDescription(e.getMessage()).withCause(e).asException())
.doOnError(Throwable::printStackTrace);
});
}
Expand Down

0 comments on commit 3e73163

Please sign in to comment.