Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extending BatchActuateStreamResponse #103

Merged
merged 5 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions databroker/src/grpc/kuksa_val_v2/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use databroker_proto::kuksa::val::v2::{
};

use kuksa::proto::v2::{
signal_id, ActuateRequest, ActuateResponse, BatchActuateStreamRequest, ListMetadataResponse,
ProvideActuationResponse,
signal_id, ActuateRequest, ActuateResponse, BatchActuateStreamRequest, ErrorCode,
ListMetadataResponse, ProvideActuationResponse,
};
use std::collections::HashSet;
use tokio::{select, sync::mpsc};
Expand Down Expand Up @@ -614,8 +614,10 @@ impl proto::val_server::Val for broker::DataBroker {
// e.g. if sending an unsupported enum value
// - if the published value is out of the min/max range specified
//
// - Provider returns BatchActuateStreamResponse <- Databroker sends BatchActuateStreamRequest
// No error definition, a BatchActuateStreamResponse is expected from provider.
// - Databroker sends BatchActuateStreamRequest -> Provider shall return a BatchActuateStreamResponse,
// for every signal requested to indicate if the request was accepted or not.
// It is up to the provider to decide if the stream shall be closed,
// as of today Databroker will not react on the received error message.
//
async fn open_provider_stream(
&self,
Expand Down Expand Up @@ -666,8 +668,30 @@ impl proto::val_server::Val for broker::DataBroker {
}
}
},
Some(BatchActuateStreamResponse(_batch_actuate_stream_response)) => {
// TODO discuss and implement
Some(BatchActuateStreamResponse(batch_actuate_stream_response)) => {

if let Some(error) = batch_actuate_stream_response.error {
match error.code() {
ErrorCode::Ok => {},
_ => {
let mut msg : String = "Batch actuate stream response error".to_string();
if let Some(signal_id) = batch_actuate_stream_response.signal_id {
match signal_id.signal {
Some(proto::signal_id::Signal::Path(path)) => {
msg = format!("{}, path: {}", msg, &path);
}
Some(proto::signal_id::Signal::Id(id)) => {
msg = format!("{}, id: {}",msg, &id.to_string());
}
None => {}
}
}
msg = format!("{}, error code: {}, error message: {}", msg, &error.code.to_string(), &error.message);
debug!(msg)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is just a first step in implementing this. But shouldn't the error coming from the provider be forwarded to the originator of the actuate(...) call, i.e. the "consumer"?

Copy link
Contributor Author

@erikbosch erikbosch Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I looked mostly at the Provider rpc.

But I assume we need quite a lot refactoring if we are to return results to the "consumer", some aspects I think of:

  • If Actuate / BatchActuate are to return actual results, the Databroker will need to wait until the providers has replied (for BatchActuate there could be multiple, right?)
  • That means the providers MUST send a response also on OK. Possibly combined with a timeout in databroker so that Actuation is considered unsuccessful if Databroker does not get a response within X seconds.
  • Then we maybe also need some form of the request-id/statefulness, so we know which request the error belong to. At least theoretically there might be multiple consumers trying to change the same signal, right? And it can happen partially in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you are discussing here was postponed if I remember correctly as we were not sure if we wanted to put databroker to wait until it receives ACK of receipt and forwards it to the consumer.
So in my opinion, this PR is enough for now if databroker can output some debug actuate information.

}
}
}

},
None => {

Expand Down
9 changes: 5 additions & 4 deletions proto/kuksa/val/v2/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ message Error {
}

enum ErrorCode {
OK = 0;
INVALID_ARGUMENT = 1;
NOT_FOUND = 2;
PERMISSION_DENIED = 3;
ERROR_CODE_UNSPECIFIED = 0; // Default value, never to be explicitly set,
ERROR_CODE_OK = 1;
ERROR_CODE_INVALID_ARGUMENT = 2;
ERROR_CODE_NOT_FOUND = 3;
ERROR_CODE_PERMISSION_DENIED = 4;
}

message Metadata {
Expand Down
9 changes: 7 additions & 2 deletions proto/kuksa/val/v2/val.proto
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,10 @@ service VAL {
// e.g. if sending an unsupported enum value
// - if the published value is out of the min/max range specified
//
// - Provider returns BatchActuateStreamResponse <- Databroker sends BatchActuateStreamRequest
// No error definition, a BatchActuateStreamResponse is expected from provider.
// - Databroker sends BatchActuateStreamRequest -> Provider shall return a BatchActuateStreamResponse,
// for every signal requested to indicate if the request was accepted or not.
// It is up to the provider to decide if the stream shall be closed,
// as of today Databroker will not react on the received error message.
//
rpc OpenProviderStream(stream OpenProviderStreamRequest) returns (stream OpenProviderStreamResponse);

Expand Down Expand Up @@ -284,7 +286,10 @@ message BatchActuateStreamRequest {
repeated ActuateRequest actuate_requests = 1;
}

// Message that shall be used by provider to indicate if an actuation request was accepted.
message BatchActuateStreamResponse {
SignalID signal_id = 1;
Error error = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit of a architectural question, but do we really want a single Error enum that will contain every error possible for all functions in the whole service?

Wouldn't it be better to have specific errors for different functions, i.e. an ActuationError that only contains the errors possible when trying to actuate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the enum I used matched quite well. At least I can find use-cases for all of them

  • OK - Good to have a default value that indicates no error
  • INVALID_ARGUMENT - value is received is out of expected range
  • NOT_FOUND - signal id is not known by provider
  • PERMISSION_DENIED - signal is known by provider, but it does not accept that it is changes. A possible example could be that the provider has not registered itself for the signal id, so it does not expect to get it.

Off course one can argue that everything here is INVALID_ARGUMENT, so we do not need the rest. But to me the set seems quite reasonable.

https://github.com/eclipse-kuksa/kuksa-databroker/blob/main/proto/kuksa/val/v2/types.proto#L64

message Error {
  ErrorCode code = 1;
  string message = 2;
}

enum ErrorCode {
  OK                = 0;
  INVALID_ARGUMENT  = 1;
  NOT_FOUND         = 2;
  PERMISSION_DENIED = 3;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this could be further refined in the future I think to use ErrorCode if it meets current needs is sufficient for now.

}

message OpenProviderStreamRequest {
Expand Down