Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(streaming): permit-based back-pressure in exchange #6170

Merged
merged 11 commits into from
Nov 7, 2022
Merged
174 changes: 131 additions & 43 deletions dashboard/proto/gen/task_service.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 21 additions & 8 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,6 @@ message GetDataResponse {
data.DataChunk record_batch = 2;
}

message GetStreamRequest {
uint32 up_actor_id = 1;
uint32 down_actor_id = 2;
uint32 up_fragment_id = 3;
uint32 down_fragment_id = 4;
}

message ExecuteRequest {
batch_plan.TaskId task_id = 1;
batch_plan.PlanFragment plan = 2;
Expand All @@ -82,11 +75,31 @@ message GetDataRequest {
batch_plan.TaskOutputId task_output_id = 1;
}

message GetStreamRequest {
// The first message, which tells the upstream which channel this exchange stream is for.
message Get {
uint32 up_actor_id = 1;
uint32 down_actor_id = 2;
uint32 up_fragment_id = 3;
uint32 down_fragment_id = 4;
}
// The following messages, which adds the permits back to the upstream to achieve back-pressure.
message AddPermits {
uint32 permits = 1;
}
oneof value {
Get get = 1;
AddPermits add_permits = 2;
}
}

message GetStreamResponse {
stream_plan.StreamMessage message = 1;
// The number of permits acquired for this message, which should be sent back to the upstream with `AddPermits`.
uint32 permits = 2;
}

service ExchangeService {
rpc GetData(GetDataRequest) returns (stream GetDataResponse);
rpc GetStream(GetStreamRequest) returns (stream GetStreamResponse);
rpc GetStream(stream GetStreamRequest) returns (stream GetStreamResponse);
}
4 changes: 2 additions & 2 deletions src/batch/src/execution/local_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ mod tests {
use risingwave_rpc_client::ComputeClient;
use tokio::time::sleep;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tonic::{Request, Response, Status, Streaming};

use crate::exchange_source::ExchangeSource;
use crate::execution::grpc_exchange::GrpcExchangeSource;
Expand Down Expand Up @@ -128,7 +128,7 @@ mod tests {

async fn get_stream(
&self,
_request: Request<GetStreamRequest>,
_request: Request<Streaming<GetStreamRequest>>,
) -> Result<Response<Self::GetStreamStream>, Status> {
unimplemented!()
}
Expand Down
6 changes: 5 additions & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ use serde::{Deserialize, Serialize};
use crate::error::ErrorCode::InternalError;
use crate::error::{Result, RwError};

/// Use the maximum value for HTTP/2 connection window size to avoid deadlock among multiplexed
/// streams on the same connection.
pub const MAX_CONNECTION_WINDOW_SIZE: u32 = (1 << 31) - 1;
pub const STREAM_WINDOW_SIZE: u32 = 65535;
/// Use a large value for HTTP/2 stream window size to improve the performance of remote exchange,
/// as we don't rely on this for back-pressure.
pub const STREAM_WINDOW_SIZE: u32 = 32 * 1024 * 1024; // 32 MB

pub fn load_config<S>(path: &str) -> Result<S>
where
Expand Down
Loading