Skip to content

Commit

Permalink
SubscribeById implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaeling committed Oct 22, 2024
1 parent 7b47ed9 commit b66ae28
Show file tree
Hide file tree
Showing 7 changed files with 309 additions and 39 deletions.
28 changes: 28 additions & 0 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ pub struct NotificationError {}

#[derive(Debug, Clone, Default)]
pub struct EntryUpdate {
pub id: Option<i32>,

pub path: Option<String>,

pub datapoint: Option<Datapoint>,
Expand Down Expand Up @@ -757,6 +759,7 @@ impl ChangeSubscription {
let mut update = EntryUpdate::default();
let mut notify_fields = HashSet::new();
// TODO: Perhaps make path optional
update.id = Some(entry.metadata.id);
update.path = Some(entry.metadata.path.clone());
if changed_fields.contains(&Field::Datapoint)
&& fields.contains(&Field::Datapoint)
Expand Down Expand Up @@ -813,6 +816,7 @@ impl ChangeSubscription {
let mut update = EntryUpdate::default();
let mut notify_fields = HashSet::new();
// TODO: Perhaps make path optional
update.id = Some(entry.metadata.id);
update.path = Some(entry.metadata.path.clone());
if fields.contains(&Field::Datapoint) {
update.datapoint = Some(entry.datapoint.clone());
Expand Down Expand Up @@ -2010,6 +2014,7 @@ mod tests {
.update_entries([(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: Some(Datapoint {
ts: time1,
Expand Down Expand Up @@ -2042,6 +2047,7 @@ mod tests {
.update_entries([(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: Some(Datapoint {
ts: time1,
Expand All @@ -2064,6 +2070,7 @@ mod tests {
.update_entries([(
id2,
EntryUpdate {
id: Some(id2),
path: None,
datapoint: None,
actuator_target: Some(Some(Datapoint {
Expand Down Expand Up @@ -2136,6 +2143,7 @@ mod tests {
.update_entries([(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
Expand All @@ -2162,6 +2170,7 @@ mod tests {
.update_entries([(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: None,
actuator_target: None,
Expand All @@ -2184,6 +2193,7 @@ mod tests {
.update_entries([(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: None,
actuator_target: None,
Expand All @@ -2203,6 +2213,7 @@ mod tests {
.update_entries([(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: Some(Datapoint {
ts: time1,
Expand Down Expand Up @@ -2274,6 +2285,7 @@ mod tests {
.update_entries([(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
Expand Down Expand Up @@ -2376,6 +2388,7 @@ mod tests {
.update_entries([(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
Expand Down Expand Up @@ -2459,6 +2472,7 @@ mod tests {
.update_entries([(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
Expand Down Expand Up @@ -2518,6 +2532,7 @@ mod tests {
.update_entries([(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
Expand Down Expand Up @@ -2616,6 +2631,7 @@ mod tests {
(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
Expand All @@ -2633,6 +2649,7 @@ mod tests {
(
id2,
EntryUpdate {
id: Some(id2),
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
Expand Down Expand Up @@ -2696,6 +2713,7 @@ mod tests {
.update_entries([(
id,
EntryUpdate {
id: Some(id),
path: None,
datapoint: Some(Datapoint {
ts,
Expand Down Expand Up @@ -2761,6 +2779,7 @@ mod tests {
.update_entries([(
id,
EntryUpdate {
id: Some(id),
path: None,
datapoint: Some(Datapoint {
ts,
Expand Down Expand Up @@ -2841,6 +2860,7 @@ mod tests {
.update_entries([(
id,
EntryUpdate {
id: Some(id),
path: None,
datapoint: Some(Datapoint {
ts,
Expand Down Expand Up @@ -2897,6 +2917,7 @@ mod tests {
.update_entries([(
id,
EntryUpdate {
id: Some(id),
path: None,
datapoint: Some(Datapoint {
ts,
Expand Down Expand Up @@ -2952,6 +2973,7 @@ mod tests {
.update_entries([(
id,
EntryUpdate {
id: Some(id),
path: None,
datapoint: Some(Datapoint {
ts,
Expand Down Expand Up @@ -3004,6 +3026,7 @@ mod tests {
.update_entries([(
id,
EntryUpdate {
id: Some(id),
path: None,
datapoint: Some(Datapoint {
ts,
Expand Down Expand Up @@ -3033,6 +3056,7 @@ mod tests {
.update_entries([(
id,
EntryUpdate {
id: Some(id),
path: None,
datapoint: Some(Datapoint {
ts,
Expand Down Expand Up @@ -3090,6 +3114,7 @@ mod tests {
.update_entries([(
id,
EntryUpdate {
id: Some(id),
path: None,
datapoint: Some(Datapoint {
ts,
Expand Down Expand Up @@ -3119,6 +3144,7 @@ mod tests {
.update_entries([(
id,
EntryUpdate {
id: Some(id),
path: None,
datapoint: Some(Datapoint {
ts,
Expand Down Expand Up @@ -3179,6 +3205,7 @@ mod tests {
.update_entries([(
id,
EntryUpdate {
id: Some(id),
path: None,
datapoint: Some(Datapoint {
ts,
Expand Down Expand Up @@ -3266,6 +3293,7 @@ mod tests {
.update_entries([(
id1,
EntryUpdate {
id: Some(id1),
path: None,
datapoint: Some(Datapoint {
ts: SystemTime::now(),
Expand Down
1 change: 1 addition & 0 deletions databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,7 @@ impl broker::EntryUpdate {
None
};
Self {
id: None,
path: None,
datapoint,
actuator_target,
Expand Down
Loading

0 comments on commit b66ae28

Please sign in to comment.