diff --git a/databroker/src/broker.rs b/databroker/src/broker.rs index 3fcc5d64..943b633a 100644 --- a/databroker/src/broker.rs +++ b/databroker/src/broker.rs @@ -132,6 +132,7 @@ pub struct QueryField { #[derive(Debug)] pub struct ChangeNotification { + pub id: i32, pub update: EntryUpdate, pub fields: HashSet, } @@ -775,6 +776,7 @@ impl ChangeSubscription { // fill unit field always update.unit.clone_from(&entry.metadata.unit); notifications.updates.push(ChangeNotification { + id: *id, update, fields: notify_fields, }); @@ -824,6 +826,7 @@ impl ChangeSubscription { notify_fields.insert(Field::ActuatorTarget); } notifications.updates.push(ChangeNotification { + id: *id, update, fields: notify_fields, }); diff --git a/databroker/src/grpc/kuksa_val_v2/val.rs b/databroker/src/grpc/kuksa_val_v2/val.rs index a38b9581..39034d0f 100644 --- a/databroker/src/grpc/kuksa_val_v2/val.rs +++ b/databroker/src/grpc/kuksa_val_v2/val.rs @@ -256,9 +256,59 @@ impl proto::val_server::Val for broker::DataBroker { async fn subscribe_by_id( &self, - _request: tonic::Request, + request: tonic::Request, ) -> Result, tonic::Status> { - Err(tonic::Status::unimplemented("Unimplemented")) + debug!(?request); + let permissions = match request.extensions().get::() { + Some(permissions) => { + debug!(?permissions); + permissions.clone() + } + None => return Err(tonic::Status::unauthenticated("Unauthenticated")), + }; + + let broker = self.authorized_access(&permissions); + + let request = request.into_inner(); + + let signal_ids = request.signal_ids; + let size = signal_ids.len(); + + let mut valid_requests: HashMap> = HashMap::with_capacity(size); + + for id in signal_ids { + valid_requests.insert( + match get_signal( + Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(id)), + }), + &broker, + ) + .await + { + Ok(signal_id) => signal_id, + Err(err) => return Err(err), + }, + vec![broker::Field::Datapoint].into_iter().collect(), + ); + } + + match broker.subscribe(valid_requests).await { + Ok(stream) => { + let stream = convert_to_proto_stream_id(stream, size); + Ok(tonic::Response::new(Box::pin(stream))) + } + Err(SubscriptionError::NotFound) => { + Err(tonic::Status::new(tonic::Code::NotFound, "Path not found")) + } + Err(SubscriptionError::InvalidInput) => Err(tonic::Status::new( + tonic::Code::InvalidArgument, + "Invalid Argument", + )), + Err(SubscriptionError::InternalError) => { + Err(tonic::Status::new(tonic::Code::Internal, "Internal Error")) + } + } } // Actuate a single actuator @@ -1031,6 +1081,26 @@ fn convert_to_proto_stream( }) } +fn convert_to_proto_stream_id( + input: impl Stream, + size: usize, +) -> impl Stream> { + input.map(move |item| { + let mut entries: HashMap = HashMap::with_capacity(size); + for update in item.updates { + let update_datapoint: Option = match update.update.datapoint { + Some(datapoint) => datapoint.into(), + None => None, + }; + if let Some(dp) = update_datapoint { + entries.insert(update.id, dp); + } + } + let response = proto::SubscribeByIdResponse { entries }; + Ok(response) + }) +} + #[cfg(test)] mod tests { use super::*; @@ -1720,7 +1790,6 @@ mod tests { async fn test_publish_value() { let broker = DataBroker::default(); let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); - let f = false; let entry_id = authorized_access .add_entry( @@ -1767,7 +1836,7 @@ mod tests { } Err(status) => { // Handle the error from the publish_value function - assert!(f, "Publish failed with status: {:?}", status); + panic!("Publish failed with status: {:?}", status); } } } @@ -1776,7 +1845,6 @@ mod tests { async fn test_publish_value_signal_id_not_found() { let broker = DataBroker::default(); let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); - let f = false; let _entry_id = authorized_access .add_entry( @@ -1818,7 +1886,7 @@ mod tests { match broker.publish_value(publish_value_request).await { Ok(_) => { // Handle the successful response - assert!(f, "Should not happen!"); + panic!("Should not happen!"); } Err(status) => { // Handle the error from the publish_value function @@ -1828,6 +1896,51 @@ mod tests { } } + async fn publish_value( + broker: &DataBroker, + entry_id: i32, + input_value: Option, + input_timestamp: Option, + ) { + let timestamp = input_timestamp.map(|input_timestamp| input_timestamp.into()); + + let mut request = tonic::Request::new(proto::PublishValueRequest { + signal_id: Some(proto::SignalId { + signal: Some(proto::signal_id::Signal::Id(entry_id)), + }), + data_point: Some(proto::Datapoint { + timestamp, + + value: match input_value { + Some(true) => Some(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(true)), + }), + Some(false) => Some(proto::Value { + typed_value: Some(proto::value::TypedValue::Bool(false)), + }), + None => None, + }, + }), + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + match broker.publish_value(request).await { + Ok(response) => { + // Handle the successful response + let publish_response = response.into_inner(); + + // Check if there is an error in the response + assert_eq!(publish_response, proto::PublishValueResponse {}); + } + Err(status) => { + // Handle the error from the publish_value function + panic!("Publish failed with status: {:?}", status); + } + } + } + /* Test subscribe service method */ @@ -1855,7 +1968,6 @@ mod tests { }, ); - let f = false; match item { Ok(subscribe_response) => { // Process the SubscribeResponse @@ -1871,38 +1983,128 @@ mod tests { assert_eq!(entry1.value, entry2.value); } (Some(entry1), None) => { - assert!(f, "Key '{}' is only in response: {:?}", key, entry1) + panic!("Key '{}' is only in response: {:?}", key, entry1) + } + (None, Some(entry2)) => { + panic!("Key '{}' is only in expected_response: {:?}", key, entry2) } - (None, Some(entry2)) => assert!( - f, - "Key '{}' is only in expected_response: {:?}", - key, entry2 - ), (None, None) => unreachable!(), } } } Err(err) => { - assert!(f, "Error {:?}", err) + panic!("Error {:?}", err) } } } - async fn publish_value( - broker: &DataBroker, - entry_id: i32, - input_value: Option, - input_timestamp: Option, - ) { - let timestamp = input_timestamp.map(|input_timestamp| input_timestamp.into()); + let broker = DataBroker::default(); - let mut request = tonic::Request::new(proto::PublishValueRequest { - signal_id: Some(proto::SignalId { - signal: Some(proto::signal_id::Signal::Id(entry_id)), - }), - data_point: Some(proto::Datapoint { - timestamp, + let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); + let entry_id = authorized_access + .add_entry( + "test.datapoint1".to_string(), + broker::DataType::Bool, + broker::ChangeType::OnChange, + broker::EntryType::Sensor, + "Some Description that Does Not Matter".to_owned(), + None, + None, + ) + .await + .unwrap(); + + if has_value { + publish_value(&broker, entry_id, Some(false), None).await + } + let mut request = tonic::Request::new(proto::SubscribeRequest { + signal_paths: vec!["test.datapoint1".to_string()], + }); + + request + .extensions_mut() + .insert(permissions::ALLOW_ALL.clone()); + + let result = tokio::task::block_in_place(|| { + // Blocking operation here + // Since broker.subscribe is async, you need to run it in an executor + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(broker.subscribe(request)) + }); + + // Publish "true" as value + publish_value(&broker, entry_id, Some(true), None).await; + + // Publish "false" as value + publish_value(&broker, entry_id, Some(false), None).await; + + // Publish "false" again but with new timestamp - as it is not an update we shall not get anything + + let timestamp = std::time::SystemTime::now(); + publish_value(&broker, entry_id, Some(false), timestamp.into()).await; + + // Publish None as value, equals reset + publish_value(&broker, entry_id, None, None).await; + + // Publish "true" as value + + publish_value(&broker, entry_id, Some(true), None).await; + + if let Ok(stream) = result { + // Process the stream by iterating over the items + let mut stream = stream.into_inner(); + + let mut item_count = 0; + while let Some(item) = stream.next().await { + match item_count { + 0 => { + check_stream_next(&item, if has_value { Some(false) } else { None }).await; + } + 1 => { + check_stream_next(&item, Some(true)).await; + } + 2 => { + // As long as value stays as false we do not get anything new, so prepare for None + check_stream_next(&item, Some(false)).await; + } + 3 => { + check_stream_next(&item, None).await; + } + 4 => { + check_stream_next(&item, Some(true)).await; + // And we do not expect more + break; + } + _ => panic!( + "You shouldn't land here too many items reported back to the stream." + ), + } + item_count += 1; + } + // Make sure stream is not closed in advance + assert_eq!(item_count, 4); + } else { + panic!("Something went wrong while getting the stream.") + } + } + + /* + Test subscribe service method by id + */ + async fn test_subscribe_case_by_id(has_value: bool) { + async fn check_stream_next_by_id( + item: &Result, + input_value: Option, + signal_id: i32, + ) { + // Create Datapoint + let mut expected_response: HashMap = HashMap::new(); + // We expect to get an empty response first + expected_response.insert( + signal_id, + proto::Datapoint { + timestamp: None, value: match input_value { Some(true) => Some(proto::Value { typed_value: Some(proto::value::TypedValue::Bool(true)), @@ -1912,28 +2114,34 @@ mod tests { }), None => None, }, - }), - }); - - request - .extensions_mut() - .insert(permissions::ALLOW_ALL.clone()); - match broker.publish_value(request).await { - Ok(response) => { - // Handle the successful response - let publish_response = response.into_inner(); + }, + ); - // Check if there is an error in the response - assert_eq!(publish_response, proto::PublishValueResponse {}); + match item { + Ok(subscribe_response) => { + // Process the SubscribeResponse + let response = &subscribe_response.entries; + assert_eq!(response.len(), expected_response.len()); + for key in response.keys() { + match (response.get(key), expected_response.get(key)) { + (Some(entry1), Some(entry2)) => { + assert_eq!(entry1.value, entry2.value); + } + (Some(entry1), None) => { + panic!("Key '{}' is only in response: {:?}", key, entry1) + } + (None, Some(entry2)) => { + panic!("Key '{}' is only in expected_response: {:?}", key, entry2) + } + (None, None) => unreachable!(), + } + } } - Err(status) => { - // Handle the error from the publish_value function - panic!("Publish failed with status: {:?}", status); + Err(err) => { + panic!("Error {:?}", err) } } } - - let f = false; let broker = DataBroker::default(); let authorized_access = broker.authorized_access(&permissions::ALLOW_ALL); @@ -1954,8 +2162,8 @@ mod tests { publish_value(&broker, entry_id, Some(false), None).await } - let mut request = tonic::Request::new(proto::SubscribeRequest { - signal_paths: vec!["test.datapoint1".to_string()], + let mut request = tonic::Request::new(proto::SubscribeByIdRequest { + signal_ids: vec![entry_id], }); request @@ -1966,7 +2174,7 @@ mod tests { // Blocking operation here // Since broker.subscribe is async, you need to run it in an executor let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(broker.subscribe(request)) + rt.block_on(broker.subscribe_by_id(request)) }); // Publish "true" as value @@ -1995,25 +2203,29 @@ mod tests { while let Some(item) = stream.next().await { match item_count { 0 => { - check_stream_next(&item, if has_value { Some(false) } else { None }).await; + check_stream_next_by_id( + &item, + if has_value { Some(false) } else { None }, + entry_id, + ) + .await; } 1 => { - check_stream_next(&item, Some(true)).await; + check_stream_next_by_id(&item, Some(true), entry_id).await; } 2 => { // As long as value stays as false we do not get anything new, so prepare for None - check_stream_next(&item, Some(false)).await; + check_stream_next_by_id(&item, Some(false), entry_id).await; } 3 => { - check_stream_next(&item, None).await; + check_stream_next_by_id(&item, None, entry_id).await; } 4 => { - check_stream_next(&item, Some(true)).await; + check_stream_next_by_id(&item, Some(true), entry_id).await; // And we do not expect more break; } - _ => assert!( - f, + _ => panic!( "You shouldn't land here too many items reported back to the stream." ), } @@ -2022,7 +2234,7 @@ mod tests { // Make sure stream is not closed in advance assert_eq!(item_count, 4); } else { - assert!(f, "Something went wrong while getting the stream.") + panic!("Something went wrong while getting the stream.") } } @@ -2030,6 +2242,8 @@ mod tests { async fn test_subscribe() { test_subscribe_case(false).await; test_subscribe_case(true).await; + test_subscribe_case_by_id(false).await; + test_subscribe_case_by_id(true).await; } /*