diff --git a/example/async-stream-client.rs b/example/async-stream-client.rs index 04f8c7b..9e3ec21 100644 --- a/example/async-stream-client.rs +++ b/example/async-stream-client.rs @@ -44,9 +44,12 @@ async fn main() { let sc1 = sc.clone(); let t6 = tokio::spawn(echo_null_stream(sc1)); - let t7 = tokio::spawn(echo_default_value(sc)); + let sc1 = sc.clone(); + let t7 = tokio::spawn(echo_default_value(sc1)); + + let t8 = tokio::spawn(server_send_stream(sc)); - let _ = tokio::join!(t1, t2, t3, t4, t5, t6, t7); + let _ = tokio::join!(t1, t2, t3, t4, t5, t6, t7, t8); } fn default_ctx() -> Context { @@ -201,3 +204,18 @@ async fn echo_default_value(cli: streaming_ttrpc::StreamingClient) { assert_eq!(received.seq, 0); assert_eq!(received.msg, ""); } + +#[cfg(unix)] +async fn server_send_stream(cli: streaming_ttrpc::StreamingClient) { + let mut stream = cli + .server_send_stream(default_ctx(), &Default::default()) + .await + .unwrap(); + + let mut seq = 0; + while let Some(received) = stream.recv().await.unwrap() { + assert_eq!(received.seq, seq); + assert_eq!(received.msg, "hello"); + seq += 1; + } +} diff --git a/example/async-stream-server.rs b/example/async-stream-server.rs index 4dcba88..bd70935 100644 --- a/example/async-stream-server.rs +++ b/example/async-stream-server.rs @@ -152,6 +152,25 @@ impl streaming_ttrpc::Streaming for StreamingService { Ok(()) } + + async fn server_send_stream( + &self, + _ctx: &::ttrpc::r#async::TtrpcContext, + _: empty::Empty, + s: ::ttrpc::r#async::ServerStreamSender, + ) -> ::ttrpc::Result<()> { + let mut seq = 0; + while seq < 10 { + sleep(std::time::Duration::from_secs(1)).await; + let mut e = streaming::EchoPayload::new(); + e.seq = seq; + e.msg = format!("hello"); + s.send(&e).await.unwrap(); + seq += 1; + } + + Ok(()) + } } #[cfg(windows)] diff --git a/example/protocols/protos/streaming.proto b/example/protocols/protos/streaming.proto index 3aa7d85..8a32cd6 100644 --- a/example/protocols/protos/streaming.proto +++ b/example/protocols/protos/streaming.proto @@ -33,6 +33,7 @@ service Streaming { rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty); rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty); rpc EchoDefaultValue(EchoPayload) returns (stream EchoPayload); + rpc ServerSendStream(google.protobuf.Empty) returns (stream EchoPayload); } message EchoPayload {