Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into erik_err
Browse files Browse the repository at this point in the history
  • Loading branch information
erikbosch committed Nov 20, 2024
2 parents aa1323a + fbbab49 commit 6aa6c25
Show file tree
Hide file tree
Showing 17 changed files with 328 additions and 211 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion databroker-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

[package]
name = "databroker-cli"
version = "0.4.7-dev.0"
version = "0.4.7-dev.1"
authors = ["Eclipse KUKSA Project"]
edition = "2021"
license = "Apache-2.0"
Expand Down
2 changes: 1 addition & 1 deletion databroker-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

[package]
name = "databroker-proto"
version = "0.4.7-dev.0"
version = "0.4.7-dev.1"
authors = ["Eclipse KUKSA Project"]
edition = "2021"
license = "Apache-2.0"
Expand Down
3 changes: 1 addition & 2 deletions databroker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

[package]
name = "databroker"
version = "0.4.7-dev.0"
version = "0.4.7-dev.1"
authors = ["Eclipse KUKSA Project"]
edition = "2021"
license = "Apache-2.0"
Expand Down Expand Up @@ -60,7 +60,6 @@ glob-match = "0.2.1"
jemallocator = { version = "0.5.0", optional = true }
lazy_static = "1.4.0"
thiserror = "1.0.47"

futures = { version = "0.3.28" }
async-trait = "0.1.82"

Expand Down
52 changes: 48 additions & 4 deletions databroker/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1642,8 +1642,10 @@ impl<'a, 'b> AuthorizedAccess<'a, 'b> {
if cap > MAX_SUBSCRIBE_BUFFER_SIZE {
return Err(SubscriptionError::InvalidBufferSize);
}
cap
// Requested capacity for old messages plus 1 for latest
cap + 1
} else {
// Just latest message
1
};

Expand Down Expand Up @@ -4239,8 +4241,7 @@ pub mod tests {
}
}

#[tokio::test]
async fn test_subscribe_and_get() {
async fn test_subscribe_and_get_buffer_size(buffer_size: Option<usize>) {
let broker = DataBroker::default();
let broker = broker.authorized_access(&permissions::ALLOW_ALL);

Expand All @@ -4262,7 +4263,7 @@ pub mod tests {
let mut stream = broker
.subscribe(
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
None,
buffer_size,
)
.await
.expect("subscription should succeed");
Expand Down Expand Up @@ -4340,6 +4341,49 @@ pub mod tests {
}
}

#[tokio::test]
async fn test_subscribe_and_get() {
// None and 0-1000 is valid range
test_subscribe_and_get_buffer_size(None).await;
test_subscribe_and_get_buffer_size(Some(0)).await;
test_subscribe_and_get_buffer_size(Some(1000)).await;
}

#[tokio::test]
async fn test_subscribe_buffersize_out_of_range() {
let broker = DataBroker::default();
let broker = broker.authorized_access(&permissions::ALLOW_ALL);

let id1 = broker
.add_entry(
"test.datapoint1".to_owned(),
DataType::Int32,
ChangeType::OnChange,
EntryType::Sensor,
"Test datapoint 1".to_owned(),
None, // min
None, // max
None,
None,
)
.await
.expect("Register datapoint should succeed");

match broker
.subscribe(
HashMap::from([(id1, HashSet::from([Field::Datapoint]))]),
// 1001 is just outside valid range 0-1000
Some(1001),
)
.await
{
Err(SubscriptionError::InvalidBufferSize) => {}
_ => {
panic!("expected it to fail with InvalidBufferSize");
}
}
}

#[tokio::test]
async fn test_metadata_for_each() {
let db = DataBroker::default();
Expand Down
2 changes: 1 addition & 1 deletion databroker/src/grpc/kuksa_val_v1/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ impl proto::val_server::Val for broker::DataBroker {
}
}

match broker.subscribe(entries, Some(1)).await {
match broker.subscribe(entries, None).await {
Ok(stream) => {
let stream = convert_to_proto_stream(stream);
Ok(tonic::Response::new(Box::pin(stream)))
Expand Down
12 changes: 0 additions & 12 deletions databroker/src/grpc/kuksa_val_v2/val.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,6 @@ impl proto::val_server::Val for broker::DataBroker {
};

let request = request.into_inner();
if request.buffer_size == 0 {
return Err(tonic::Status::invalid_argument(format!(
"Provided buffer_size {} should be greater than zero.",
request.buffer_size
)));
}

let broker = self.authorized_access(&permissions);

Expand Down Expand Up @@ -301,12 +295,6 @@ impl proto::val_server::Val for broker::DataBroker {
};

let request = request.into_inner();
if request.buffer_size == 0 {
return Err(tonic::Status::invalid_argument(format!(
"Provided lag_buffer_capacity {} should be greater than zero.",
request.buffer_size
)));
}

let broker = self.authorized_access(&permissions);

Expand Down
79 changes: 52 additions & 27 deletions databroker/src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@

use std::{convert::TryFrom, future::Future, time::Duration};

use tokio::net::TcpListener;
use tokio_stream::wrappers::TcpListenerStream;
use tonic::transport::Server;
use futures::Stream;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::{TcpListener, UnixListener},
};
use tokio_stream::wrappers::{TcpListenerStream, UnixListenerStream};
#[cfg(feature = "tls")]
use tonic::transport::ServerTlsConfig;
use tonic::transport::{server::Connected, Server};
use tracing::{debug, info};

use databroker_proto::{kuksa, sdv};
Expand All @@ -34,7 +38,7 @@ pub enum ServerTLS {
Enabled { tls_config: ServerTlsConfig },
}

#[derive(PartialEq)]
#[derive(PartialEq, Clone)]
pub enum Api {
KuksaValV1,
KuksaValV2,
Expand Down Expand Up @@ -96,7 +100,7 @@ where
databroker.shutdown().await;
}

pub async fn serve<F>(
pub async fn serve_tcp<F>(
addr: impl Into<std::net::SocketAddr>,
broker: broker::DataBroker,
#[cfg(feature = "tls")] server_tls: ServerTLS,
Expand All @@ -110,25 +114,14 @@ where
let socket_addr = addr.into();
let listener = TcpListener::bind(socket_addr).await?;

/* On Linux systems try to notify daemon readiness to systemd.
* This function determines whether the a system is using systemd
* or not, so it is safe to use on non-systemd systems as well.
*/
#[cfg(target_os = "linux")]
{
match sd_notify::booted() {
Ok(true) => {
info!("Notifying systemd that the service is ready");
sd_notify::notify(false, &[sd_notify::NotifyState::Ready])?;
}
_ => {
debug!("System is not using systemd, will not try to notify");
}
}
if let Ok(addr) = listener.local_addr() {
info!("Listening on {}", addr);
}

let incoming = TcpListenerStream::new(listener);

serve_with_incoming_shutdown(
listener,
incoming,
broker,
#[cfg(feature = "tls")]
server_tls,
Expand All @@ -139,23 +132,55 @@ where
.await
}

pub async fn serve_with_incoming_shutdown<F>(
listener: TcpListener,
pub async fn serve_uds<F>(
path: impl AsRef<std::path::Path>,
broker: broker::DataBroker,
#[cfg(feature = "tls")] server_tls: ServerTLS,
apis: &[Api],
authorization: Authorization,
signal: F,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Future<Output = ()>,
{
broker.start_housekeeping_task();
let listener = UnixListener::bind(path)?;

if let Ok(addr) = listener.local_addr() {
info!("Listening on {}", addr);
match addr.as_pathname() {
Some(pathname) => info!("Listening on unix socket at {}", pathname.display()),
None => info!("Listening on unix socket (unknown path)"),
}
}

let incoming = TcpListenerStream::new(listener);
let incoming = UnixListenerStream::new(listener);

serve_with_incoming_shutdown(
incoming,
broker,
ServerTLS::Disabled,
apis,
authorization,
signal,
)
.await
}

pub async fn serve_with_incoming_shutdown<F, I, IO, IE>(
incoming: I,
broker: broker::DataBroker,
#[cfg(feature = "tls")] server_tls: ServerTLS,
apis: &[Api],
authorization: Authorization,
signal: F,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Future<Output = ()>,
I: Stream<Item = Result<IO, IE>>,
IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
IO::ConnectInfo: Clone + Send + Sync + 'static,
IE: Into<Box<dyn std::error::Error + Send + Sync>>,
{
broker.start_housekeeping_task();

let mut server = Server::builder()
.http2_keepalive_interval(Some(Duration::from_secs(10)))
.http2_keepalive_timeout(Some(Duration::from_secs(20)));
Expand Down
Loading

0 comments on commit 6aa6c25

Please sign in to comment.