Skip to content

Commit

Permalink
runtime: always reply on invocation fail
Browse files Browse the repository at this point in the history
If any kind of failure occurs when proxying a gRPC method invocation,
ensure that an error response is sent on the invocation's response
channel to prevent the caller blocking forever.
  • Loading branch information
daviddrysdale committed Apr 6, 2020
1 parent d88f934 commit e426559
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 0 deletions.
24 changes: 24 additions & 0 deletions oak/server/grpc_client_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ bool GrpcClientNode::HandleInvocation(Handle invocation_handle) {
cq.Next(&got_tag, &ok);
if (!ok) {
OAK_LOG(ERROR) << "Failed to start gRPC method call";
FailInvocation(invocation->rsp_handle.get(),
grpc::Status(grpc::StatusCode::INTERNAL, "failed to start method call"));
return false;
}

Expand All @@ -87,11 +89,15 @@ bool GrpcClientNode::HandleInvocation(Handle invocation_handle) {
cq.Next(&got_tag, &ok);
if (!ok) {
OAK_LOG(ERROR) << "Failed to send gRPC request";
FailInvocation(invocation->rsp_handle.get(),
grpc::Status(grpc::StatusCode::INTERNAL, "failed to write request"));
return false;
}
call->WritesDone(tag(3));
cq.Next(&got_tag, &ok);
if (!ok) {
FailInvocation(invocation->rsp_handle.get(),
grpc::Status(grpc::StatusCode::INTERNAL, "failed to close request"));
OAK_LOG(ERROR) << "Failed to close gRPC request";
return false;
}
Expand Down Expand Up @@ -138,6 +144,8 @@ bool GrpcClientNode::HandleInvocation(Handle invocation_handle) {
}
if (!status.ok()) {
// Final status includes an error, so pass it back on the response channel.
FailInvocation(invocation->rsp_handle.get(), status);

oak::GrpcResponse grpc_rsp;
grpc_rsp.set_last(true);
grpc_rsp.mutable_status()->set_code(status.error_code());
Expand All @@ -158,6 +166,22 @@ bool GrpcClientNode::HandleInvocation(Handle invocation_handle) {
return true;
}

void GrpcClientNode::FailInvocation(Handle rsp_handle, grpc::Status status) {
oak::GrpcResponse grpc_rsp;
grpc_rsp.set_last(true);
grpc_rsp.mutable_status()->set_code(status.error_code());
grpc_rsp.mutable_status()->set_message(status.error_message());

auto rsp_msg = absl::make_unique<NodeMessage>();
size_t serialized_size = grpc_rsp.ByteSizeLong();
rsp_msg->data.resize(serialized_size);
grpc_rsp.SerializeToArray(rsp_msg->data.data(), rsp_msg->data.size());

OAK_LOG(INFO) << "Write final gRPC status of (" << status.error_code() << ", '"
<< status.error_message() << "') to response channel";
ChannelWrite(rsp_handle, std::move(rsp_msg));
}

void GrpcClientNode::Run(Handle invocation_handle) {
std::vector<std::unique_ptr<ChannelStatus>> channel_status;
channel_status.push_back(absl::make_unique<ChannelStatus>(invocation_handle));
Expand Down
1 change: 1 addition & 0 deletions oak/server/grpc_client_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class GrpcClientNode final : public NodeThread {

private:
bool HandleInvocation(Handle invocation_handle);
void FailInvocation(Handle rsp_handle, grpc::Status status);
void Run(Handle handle) override;

std::shared_ptr<grpc::ChannelInterface> channel_;
Expand Down

0 comments on commit e426559

Please sign in to comment.