Skip to content

Commit

Permalink
Handle HTTP binding error.
Browse files Browse the repository at this point in the history
Signed-off-by: Artur Souza <asouza.pro@gmail.com>
  • Loading branch information
artursouza committed Mar 1, 2024
1 parent d55095c commit a1460be
Show file tree
Hide file tree
Showing 10 changed files with 410 additions and 93 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.13.0-rc.1
DAPR_RUNTIME_VER: 1.13.0-rc.2
DAPR_RUNTIME_VER: 1.13.0-rc.10
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.13.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
GOPROXY: https://proxy.golang.org
JDK_VER: ${{ matrix.java }}
DAPR_CLI_VER: 1.13.0-rc.1
DAPR_RUNTIME_VER: 1.13.0-rc.5
DAPR_RUNTIME_VER: 1.13.0-rc.10
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/v1.13.0-rc.1/install/install.sh
DAPR_CLI_REF:
DAPR_REF:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.dapr.actors.client;

import com.google.protobuf.ByteString;
import io.dapr.client.DaprClientGrpc;
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
Expand Down Expand Up @@ -128,7 +129,7 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
return DaprClientGrpc.intercept(context, client, null);
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
Expand Down
27 changes: 27 additions & 0 deletions sdk-tests/components/http_binding.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
scopes:
- bindingit-httpoutputbinding-exception
---
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: github-http-binding-404-success
spec:
type: bindings.http
version: v1
metadata:
- name: url
value: https://api.github.com/unknown_path
- name: errorIfNot2XX
value: "false"
scopes:
- bindingit-httpoutputbinding-ignore-error
82 changes: 71 additions & 11 deletions sdk-tests/src/test/java/io/dapr/it/binding/http/BindingIT.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021 The Dapr Authors
* Copyright 2024 The Dapr Authors
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -17,9 +17,9 @@
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import io.dapr.client.domain.HttpExtension;
import io.dapr.exceptions.DaprException;
import io.dapr.it.BaseIT;
import io.dapr.it.DaprRun;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

Expand All @@ -28,28 +28,81 @@

import static io.dapr.it.Retry.callWithRetry;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Service for input and output binding example.
*/
public class BindingIT extends BaseIT {

private static final String BINDING_NAME = "sample123";
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void httpOutputBindingError(boolean useGrpc) throws Exception {
DaprRun daprRun = startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-exception",
60000);
// At this point, it is guaranteed that the service above is running and all ports being listened to.
if (useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}

private static final String BINDING_OPERATION = "create";
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// This HTTP binding did not set `errorIfNot2XX` to false in component metadata, so the error payload is not
// consistent between HTTP and gRPC.
assertTrue(new String(e.getPayload()).contains(
"error invoking output binding github-http-binding-404: received status code 404"));
}
}, 10000);
}
}

public static class MyClass {
public MyClass() {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void httpOutputBindingErrorIgnoredByComponent(boolean useGrpc) throws Exception {
DaprRun daprRun = startDaprApp(
this.getClass().getSimpleName() + "-httpoutputbinding-ignore-error",
60000);
// At this point, it is guaranteed that the service above is running and all ports being listened to.
if (useGrpc) {
daprRun.switchToGRPC();
} else {
daprRun.switchToHTTP();
}

public String message;
try(DaprClient client = new DaprClientBuilder().build()) {
// Validate error message
callWithRetry(() -> {
System.out.println("Checking exception handling for output binding ...");
try {
client.invokeBinding("github-http-binding-404-success", "get", "").block();
fail("Should throw an exception");
} catch (DaprException e) {
assertEquals(404, e.getHttpStatusCode());
// The HTTP binding must set `errorIfNot2XX` to false in component metadata for the error payload to be
// consistent between HTTP and gRPC.
assertEquals(
"{\"message\":\"Not Found\",\"documentation_url\":\"https://docs.github.com/rest\"}",
new String(e.getPayload()));
}
}, 10000);
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void inputOutputBinding(boolean useGrpc) throws Exception {
System.out.println("Working Directory = " + System.getProperty("user.dir"));
final String bidingName = "sample123";
String serviceNameVariant = useGrpc ? "-grpc" : "-http";

DaprRun daprRun = startDaprApp(
Expand All @@ -69,7 +122,7 @@ public void inputOutputBinding(boolean useGrpc) throws Exception {
callWithRetry(() -> {
System.out.println("Checking if input binding is up before publishing events ...");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, "ping").block();
bidingName, "create", "ping").block();

try {
Thread.sleep(1000);
Expand All @@ -88,14 +141,14 @@ public void inputOutputBinding(boolean useGrpc) throws Exception {

System.out.println("sending first message");
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", myClass, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();

// This is an example of sending a plain string. The input binding will receive
// cat
final String m = "cat";
System.out.println("sending " + m);
client.invokeBinding(
BINDING_NAME, BINDING_OPERATION, m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();
bidingName, "create", m, Collections.singletonMap("MyMetadata", "MyValue"), Void.class).block();

// Metadata is not used by Kafka component, so it is not possible to validate.
callWithRetry(() -> {
Expand Down Expand Up @@ -127,4 +180,11 @@ public void inputOutputBinding(boolean useGrpc) throws Exception {
}, 8000);
}
}

public static class MyClass {
public MyClass() {
}

public String message;
}
}
90 changes: 79 additions & 11 deletions sdk/src/main/java/io/dapr/client/DaprClientGrpc.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import io.dapr.client.resiliency.ResiliencyOptions;
import io.dapr.config.Properties;
import io.dapr.exceptions.DaprException;
import io.dapr.internal.opencensus.GrpcWrapper;
import io.dapr.internal.exceptions.DaprHttpException;
import io.dapr.internal.resiliency.RetryPolicy;
import io.dapr.internal.resiliency.TimeoutPolicy;
import io.dapr.serializer.DaprObjectSerializer;
Expand All @@ -65,6 +65,7 @@
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ForwardingClientCallListener;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.stub.StreamObserver;
Expand All @@ -81,10 +82,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static io.dapr.internal.exceptions.DaprHttpException.isSuccessfulHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.isValidHttpStatusCode;
import static io.dapr.internal.exceptions.DaprHttpException.parseHttpStatusCode;
import static io.dapr.internal.opencensus.GrpcWrapper.appendTracingToMetadata;

/**
* An adapter for the GRPC Client.
*
Expand Down Expand Up @@ -351,6 +356,7 @@ public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef
*/
@Override
public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type) {
Metadata responseMetadata = new Metadata();
try {
final String name = request.getName();
final String operation = request.getOperation();
Expand All @@ -377,10 +383,19 @@ public <T> Mono<T> invokeBinding(InvokeBindingRequest request, TypeRef<T> type)

return Mono.deferContextual(
context -> this.<DaprProtos.InvokeBindingResponse>createMono(
it -> intercept(context, asyncStub).invokeBinding(envelope, it)
responseMetadata,
it -> intercept(context, asyncStub, m -> responseMetadata.merge(m)).invokeBinding(envelope, it)
)
).flatMap(
it -> {
int httpStatusCode =
parseHttpStatusCode(it.getMetadataMap().getOrDefault("statusCode", ""));
if (isValidHttpStatusCode(httpStatusCode) && !isSuccessfulHttpStatusCode(httpStatusCode)) {
// Exception condition in a successful request.
// This is useful to send an exception due to an error from the HTTP binding component.
throw DaprException.propagate(new DaprHttpException(httpStatusCode, it.getData().toByteArray()));

Check warning on line 396 in sdk/src/main/java/io/dapr/client/DaprClientGrpc.java

View check run for this annotation

Codecov / codecov/patch

sdk/src/main/java/io/dapr/client/DaprClientGrpc.java#L396

Added line #L396 was not covered by tests
}

try {
return Mono.justOrEmpty(objectSerializer.deserialize(it.getData().toByteArray(), type));
} catch (IOException e) {
Expand Down Expand Up @@ -1155,21 +1170,74 @@ public void start(final Listener<RespT> responseListener, final Metadata metadat
* @param client GRPC client for Dapr.
* @return Client after adding interceptors.
*/
private static DaprGrpc.DaprStub intercept(ContextView context, DaprGrpc.DaprStub client) {
return GrpcWrapper.intercept(context, client);
private static DaprGrpc.DaprStub intercept(
ContextView context,
DaprGrpc.DaprStub client) {
return intercept(context, client, null);
}

/**
* Populates GRPC client with interceptors for telemetry - internal use only.
*
* @param context Reactor's context.
* @param client GRPC client for Dapr.
* @param metadataConsumer Handles metadata result.
* @return Client after adding interceptors.
*/
public static DaprGrpc.DaprStub intercept(
ContextView context,
DaprGrpc.DaprStub client,
Consumer<Metadata> metadataConsumer) {
ClientInterceptor interceptor = new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> methodDescriptor,
CallOptions options,
Channel channel) {
ClientCall<ReqT, RespT> clientCall = channel.newCall(methodDescriptor, options);
return new ForwardingClientCall.SimpleForwardingClientCall<>(clientCall) {
@Override
public void start(final Listener<RespT> responseListener, final Metadata metadata) {
appendTracingToMetadata(context, metadata);

final ClientCall.Listener<RespT> headerListener =
new ForwardingClientCallListener.SimpleForwardingClientCallListener<>(responseListener) {
@Override
public void onHeaders(Metadata headers) {
responseListener.onHeaders(headers);
if (metadataConsumer != null) {
metadataConsumer.accept(headers);

Check warning on line 1209 in sdk/src/main/java/io/dapr/client/DaprClientGrpc.java

View check run for this annotation

Codecov / codecov/patch

sdk/src/main/java/io/dapr/client/DaprClientGrpc.java#L1209

Added line #L1209 was not covered by tests
}
}
};
super.start(headerListener, metadata);
}
};
}
};
return client.withInterceptors(interceptor);
}

private <T> Mono<T> createMono(Consumer<StreamObserver<T>> consumer) {
return this.createMono(null, consumer);
}

private <T> Mono<T> createMono(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
Mono.create(sink -> DaprException.wrap(() -> consumer.accept(
createStreamObserver(sink, metadata))).run()));
}

private <T> Flux<T> createFlux(Consumer<StreamObserver<T>> consumer) {
return this.createFlux(null, consumer);
}

private <T> Flux<T> createFlux(Metadata metadata, Consumer<StreamObserver<T>> consumer) {
return retryPolicy.apply(
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink))).run()));
Flux.create(sink -> DaprException.wrap(() -> consumer.accept(createStreamObserver(sink, metadata))).run()));
}

private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink) {
private <T> StreamObserver<T> createStreamObserver(MonoSink<T> sink, Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
Expand All @@ -1178,7 +1246,7 @@ public void onNext(T value) {

@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}

@Override
Expand All @@ -1188,7 +1256,7 @@ public void onCompleted() {
};
}

private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink) {
private <T> StreamObserver<T> createStreamObserver(FluxSink<T> sink, final Metadata grpcMetadata) {
return new StreamObserver<T>() {
@Override
public void onNext(T value) {
Expand All @@ -1197,7 +1265,7 @@ public void onNext(T value) {

@Override
public void onError(Throwable t) {
sink.error(DaprException.propagate(new ExecutionException(t)));
sink.error(DaprException.propagate(DaprHttpException.fromGrpcExecutionException(grpcMetadata, t)));
}

@Override
Expand Down
Loading

0 comments on commit a1460be

Please sign in to comment.