Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend bidirectional provider by introducing sample interval filtering #141

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion proto/kuksa/val/v2/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,24 @@ message Value {
}
}

message SampleInterval {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why SampleInterval? Why not SampleRate, or maybe Frequency, maybe as mentioned in some of the other findings it can be inlined because it's just an object with a single property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in Metadata and Filter messages, wanted to keep same semantic and meaning between both messages.

int32 max_interval_ms = 1;
}

enum FilterError {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this shall be aligned with grpc errors? because we encountered difficulties with own error codes and stuff like that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this error enum was to transmit Provider's context errors, like can network error, provider overloaded, etc

FILTER_ERROR_CODE_UNSPECIFIED = 0;
FILTER_ERROR_CODE_OK = 1;
FILTER_ERROR_CODE_NETWORK_ERROR = 2;
FILTER_ERROR_CODE_OVERLOAD = 3;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't there be more errors like

  • Unknown signal id

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WIll added it


message Filter {
// Duration of the active call. If it is not set, call will least for ever.
int32 duration_ms = 1;
// Max desired sample update interval.
SampleInterval max_sample_interval = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use int32 inline

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it will be also used in Metadata I decided to create its own message to keep the same semantic. What do you think?

}

message SignalID {
oneof signal {
// Numeric identifier to the signal
Expand Down Expand Up @@ -74,6 +92,13 @@ enum ErrorCode {
ERROR_CODE_PERMISSION_DENIED = 4;
}

enum CommandError {
COMMAND_ERROR_CODE_UNSPECIFIED = 0; // Default value, never to be explicitly set,
COMMAND_ERROR_CODE_OK = 1;
COMMAND_ERROR_CODE_NETWORK_ERROR = 2;
COMMAND_ERROR_CODE_OVERLOAD = 3;
}

message Metadata {

// Full dot notated path for the signal
Expand Down Expand Up @@ -113,7 +138,9 @@ message Metadata {
Value allowed_values = 17; // Must be of array type
Value min = 18;
Value max = 19;


// Maximum sample interval at which its provider will publish the signal value
SampleInterval sample_interval = 20;
}

// VSS Data type of a signal
Expand Down
55 changes: 50 additions & 5 deletions proto/kuksa/val/v2/val.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ service VAL {
// PERMISSION_DENIED if access is denied
// INVALID_ARGUMENT if the request is empty or provided path is too long
// - MAX_REQUEST_PATH_LENGTH: usize = 1000;
// UNAVAILABLE if there is no provider currently providing the signal
//
rpc GetValue(GetValueRequest) returns (GetValueResponse);

Expand All @@ -44,6 +45,7 @@ service VAL {
// PERMISSION_DENIED if access is denied for any of the requested signals.
// INVALID_ARGUMENT if the request is empty or provided path is too long
// - MAX_REQUEST_PATH_LENGTH: usize = 1000;
// UNAVAILABLE if there is no provider currently providing the signals
//
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using UNAVAILABBLE for this purpose has some drawbacks:

  1. Clients cannot easily distinguish, if just the signal has no provider or if the whole data broker is unavailable
  2. I would strongly prefer to get the state of the other available signals even if some of them are "not provided" at the moment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided after meeting to remove it and keep the non availability of provider as None in datapoint

rpc GetValues(GetValuesRequest) returns (GetValuesResponse);

Expand All @@ -57,6 +59,7 @@ service VAL {
// MAX_REQUEST_PATH_LENGTH: usize = 1000;
// - if buffer_size exceeds the maximum permitted
// MAX_BUFFER_SIZE: usize = 1000;
// UNAVAILABLE if there is no provider currently providing the signal
//
// When subscribing, Databroker shall immediately return the value for all
// subscribed entries.
Expand All @@ -77,6 +80,7 @@ service VAL {
// MAX_REQUEST_PATH_LENGTH: usize = 1000;
// - if buffer_size exceeds the maximum permitted
// MAX_BUFFER_SIZE: usize = 1000;
// UNAVAILABLE if there is no provider currently providing the signal
//
// When subscribing, Databroker shall immediately return the value for all
// subscribed entries.
Expand Down Expand Up @@ -131,6 +135,7 @@ service VAL {
// NOT_FOUND if the specified root branch does not exist.
// UNAUTHENTICATED if no credentials provided or credentials has expired
// INVALID_ARGUMENT if the provided path or wildcard is wrong.
// UNAVAILABLE if there is no provider currently providing the signal -> do we want this error for list metadata?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't do this: This prevents clients - including providers - from getting the metadata of a signal if a provider for it is not (yet) available.

//
rpc ListMetadata(ListMetadataRequest) returns (ListMetadataResponse);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an error to PublishValue for the case that the addressed signal is already "claimed" by another provider (via OpenProviderStream -> ProvideSignalRequest). E.g.
// RESOURCE_EXHAUSTED if the addressed signal is already occupied by another provider
Or what is the current behavior of the broker in those cases?

Expand Down Expand Up @@ -213,6 +218,7 @@ message SubscribeRequest {
// Default (0) results in that only latest message is kept.
// Maximum value supported is implementation dependent.
uint32 buffer_size = 2;
Filter filter = 3;
}

message SubscribeResponse {
Expand All @@ -227,6 +233,7 @@ message SubscribeByIdRequest {
// Default (0) results in that only latest message is kept.
// Maximum value supported is implementation dependent.
uint32 buffer_size = 2;
Filter filter = 3;
}

message SubscribeByIdResponse {
Expand Down Expand Up @@ -268,16 +275,13 @@ message PublishValueRequest {
Datapoint data_point = 2;
}

message PublishValueResponse {
}

Comment on lines -271 to -273
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops?

message PublishValuesRequest {
int32 request_id = 1; /// Unique request id for the stream that can be used to identify the response.
uint64 request_it = 1; /// Unique request id for the stream that can be used to identify the command.
map<int32, Datapoint> data_points = 2;
}

message PublishValuesResponse {
int32 request_id = 1;
uint64 request_id = 1;
map<int32, Error> status = 2;
}

Expand All @@ -288,6 +292,13 @@ message ProvideActuationRequest {
message ProvideActuationResponse {
}

message ProvideSignalRequest {
map<int32, SampleInterval> signals_sample_intervals = 1;
}

message ProvideSignalResponse {
}

Comment on lines 288 to +297
Copy link
Contributor

@BjoernAtBosch BjoernAtBosch Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't recognize till now that the ProvideSignalRequest/Response is also new.

We should specify the behavior when a provider freshly claims a signal: Does he immediately start to send updates or does he wait until a UpdateFilterRequest comes in? Or should we send back a filter map in the response (like in the UpdateFilterRequest) which is defining the initial filter? What about "legacy" providers not knowing the filter concept?

And BTW: We did not specify the behavior if already another provider has claimed signals for actuation or provisioning. Maybe there should be some respective "error signalling" in the responses.

message BatchActuateStreamRequest {
repeated ActuateRequest actuate_requests = 1;
}
Expand All @@ -298,6 +309,28 @@ message BatchActuateStreamResponse {
Error error = 2;
}

message UpdateFilterRequest {
// Databroker sends filters to provider.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A request id isn't required here?
Can we make sure, that the data broker send only one UpdateFilterRequest to a provider and waits for the response before sending another UpdateFilterRequest?

// In case provider restarts, databroker will send local filters stored
// to continue the provider sending same signals with same filter.
map<int32, Filter> filters_update = 1;
}

// Only returned if there is a filter error on provider side
message UpdateFilterResponse {
FilterError filter_error = 1;
}

message GetValueStreamRequest {
uint64 request_it = 1; /// Unique request id for the stream that can be used to identify the request.
GetValueRequest request = 2;
}

message GetValueStreamResponse {
uint64 request_it = 1; /// Unique request id for the stream that can be used to identify the response.
GetValueResponse response = 2;
}

message OpenProviderStreamRequest {
oneof action {
// Inform server of an actuator this provider provides.
Expand All @@ -307,6 +340,12 @@ message OpenProviderStreamRequest {
// Sent to acknowledge the acceptance of a batch actuate
// request.
BatchActuateStreamResponse batch_actuate_stream_response = 3;
// Inform server of a signal this provider provides.
ProvideSignalRequest provide_signal_request = 4;
// Update filter response
UpdateFilterResponse update_filter_response = 5;
// GetValue response
GetValueStreamResponse get_value_stream_response = 6;
}
}

Expand All @@ -318,6 +357,12 @@ message OpenProviderStreamResponse {
PublishValuesResponse publish_values_response = 2;
// Send a batch actuate request to a provider.
BatchActuateStreamRequest batch_actuate_stream_request = 3;
// Response to a provide sensor request.
ProvideSignalResponse provide_signal_response = 4;
// Filter request
UpdateFilterRequest update_filter_request = 5;
// GetValue request from client forwarded to provider
GetValueStreamRequest get_value_stream_request = 6;
}
}

Expand Down
Loading