Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ErrorRequestHandler #4303

Merged
merged 10 commits into from
Mar 14, 2019
199 changes: 96 additions & 103 deletions src/coprocessor/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use crate::util::Either;
use crate::coprocessor::dag::executor::ExecutorMetrics;
use crate::coprocessor::metrics::*;
use crate::coprocessor::tracker::Tracker;
use crate::coprocessor::util as cop_util;
use crate::coprocessor::*;

const OUTDATED_ERROR_MSG: &str = "request outdated.";
Expand Down Expand Up @@ -83,12 +82,16 @@ impl<E: Engine> Endpoint<E> {

/// Parse the raw `Request` to create `RequestHandlerBuilder` and `ReqContext`.
/// Returns `Err` if fails.
fn try_parse_request(
fn parse_request(
&self,
mut req: coppb::Request,
peer: Option<String>,
is_streaming: bool,
) -> Result<(RequestHandlerBuilder<E::Snap>, ReqContext)> {
fail_point!("coprocessor_parse_request", |_| Err(box_err!(
"unsupported tp (failpoint)"
)));

let (context, data, ranges) = (
req.take_context(),
req.take_data(),
Expand Down Expand Up @@ -187,34 +190,6 @@ impl<E: Engine> Endpoint<E> {
Ok((builder, req_ctx))
}

/// Parse the raw `Request` to create `RequestHandlerBuilder` and `ReqContext`.
#[inline]
fn parse_request(
&self,
req: coppb::Request,
peer: Option<String>,
is_streaming: bool,
) -> (RequestHandlerBuilder<E::Snap>, ReqContext) {
match self.try_parse_request(req, peer, is_streaming) {
Ok(v) => v,
Err(err) => {
// If there are errors when parsing requests, create a dummy request handler.
let builder =
Box::new(|_, _: &_| Ok(cop_util::ErrorRequestHandler::new(err).into_boxed()));
let req_ctx = ReqContext::new(
"invalid",
kvrpcpb::Context::new(),
&[],
Duration::from_secs(60), // Large enough to avoid becoming outdated error
None,
None,
None,
);
(builder, req_ctx)
}
}
}

/// Get the batch row limit configuration.
#[inline]
fn get_batch_row_limit(&self, is_streaming: bool) -> usize {
Expand Down Expand Up @@ -293,39 +268,46 @@ impl<E: Engine> Endpoint<E> {
})
}

/// Handle a unary request and run on the read pool. Returns a future producing the
/// result, which must be a `Response` and will never fail. If there are errors during
/// handling, they will be embedded in the `Response`.
/// Handle a unary request and run on the read pool.
///
/// Returns `Err(err)` if the read pool is full. Returns `Ok(future)` in other cases.
/// The future inside may be an error however.
fn handle_unary_request(
&self,
req_ctx: ReqContext,
handler_builder: RequestHandlerBuilder<E::Snap>,
) -> impl Future<Item = coppb::Response, Error = ()> {
) -> Result<impl Future<Item = coppb::Response, Error = Error>> {
let engine = self.engine.clone();
let priority = readpool::Priority::from(req_ctx.context.get_priority());
// box the tracker so that moving it is cheap.
let mut tracker = Box::new(Tracker::new(req_ctx));

let result = self.read_pool.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);

Self::handle_unary_request_impl(engine, tracker, handler_builder)
});

future::result(result)
// If the read pool is full, an error response will be returned directly.
self.read_pool
.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);
Self::handle_unary_request_impl(engine, tracker, handler_builder)
})
.map_err(|_| Error::Full)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's correct anymore. What if the returned error is a Error::Region?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@BusyJay future_execute itself returns Result<impl Future>. The map_err here only maps the out most Result, not the inner one. For the out most result, it can be only caused by read pool full. Does it solve your concern?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not impl From for the error returned by read_pool? So no one has the concern anymore.

Copy link
Member Author

@breezewish breezewish Mar 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This solution was not accepted at that time. Reviewers said that it is not good to mix the sync part into the async interface.

.flatten()
.or_else(|e| Ok(make_error_response(e)))
}

/// Parses and handles a unary request. Returns a future that will never fail. If there are
/// errors during parsing or handling, they will be converted into a `Response` as the success
/// result of the future.
#[inline]
pub fn parse_and_handle_unary_request(
&self,
req: coppb::Request,
peer: Option<String>,
) -> impl Future<Item = coppb::Response, Error = ()> {
let (handler_builder, req_ctx) = self.parse_request(req, peer, false);
self.handle_unary_request(req_ctx, handler_builder)
let result_of_future =
self.parse_request(req, peer, false)
.and_then(|(handler_builder, req_ctx)| {
self.handle_unary_request(req_ctx, handler_builder)
});

future::result(result_of_future)
.flatten()
.or_else(|e| Ok(make_error_response(e)))
}

/// The real implementation of handling a stream request.
Expand Down Expand Up @@ -422,62 +404,57 @@ impl<E: Engine> Endpoint<E> {
.flatten_stream()
}

/// Handle a stream request and run on the read pool. Returns a stream producing each
/// result, which must be a `Response` and will never fail. If there are errors during
/// handling, they will be embedded in the `Response`.
/// Handle a stream request and run on the read pool.
///
/// Returns `Err(err)` if the read pool is full. Returns `Ok(stream)` in other cases.
/// The stream inside may produce errors however.
fn handle_stream_request(
&self,
req_ctx: ReqContext,
handler_builder: RequestHandlerBuilder<E::Snap>,
) -> impl Stream<Item = coppb::Response, Error = ()> {
let (tx, rx) = mpsc::channel::<coppb::Response>(self.stream_channel_size);
) -> Result<impl Stream<Item = coppb::Response, Error = Error>> {
let (tx, rx) = mpsc::channel::<Result<coppb::Response>>(self.stream_channel_size);
let engine = self.engine.clone();
let priority = readpool::Priority::from(req_ctx.context.get_priority());
// Must be created befure `future_execute`, otherwise wait time is not tracked.
let mut tracker = Box::new(Tracker::new(req_ctx));

let tx1 = tx.clone();
let result = self.read_pool.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);

Self::handle_stream_request_impl(engine, tracker, handler_builder)
.or_else(|e| Ok::<_, mpsc::SendError<_>>(make_error_response(e)))
// Although returning `Ok()` from `or_else` will continue the stream,
// our stream has already ended when error is returned.
// Thus the stream will not continue any more even after we converting errors
// into a response.
.forward(tx1)
});
self.read_pool
.future_execute(priority, move |ctxd| {
tracker.attach_ctxd(ctxd);

match result {
Err(_) => {
stream::once::<_, mpsc::SendError<_>>(Ok(make_error_response(Error::Full)))
Self::handle_stream_request_impl(engine, tracker, handler_builder) // Stream<Resp, Error>
.then(Ok::<_, mpsc::SendError<_>>) // Stream<Result<Resp, Error>, MpscError>
.forward(tx)
.then(|_| {
// ignore sink send failures
Ok::<_, ()>(())
})
// Should not be blocked, since the channel is large enough to hold 1 value.
.wait()
.unwrap();
}
Ok(cpu_future) => {
})
.map_err(|_| Error::Full)
.and_then(move |cpu_future| {
// Keep running stream producer
cpu_future.forget();
}
}

rx
// Returns the stream instead of a future
Ok(rx.then(|r| r.unwrap()))
})
}

/// Parses and handles a stream request. Returns a stream that produce each result in a
/// `Response` and will never fail. If there are errors during parsing or handling, they will
/// be converted into a `Response` as the only stream item.
#[inline]
pub fn parse_and_handle_stream_request(
&self,
req: coppb::Request,
peer: Option<String>,
) -> impl Stream<Item = coppb::Response, Error = ()> {
let (handler_builder, req_ctx) = self.parse_request(req, peer, true);
self.handle_stream_request(req_ctx, handler_builder)
let result_of_stream =
self.parse_request(req, peer, true)
.and_then(|(handler_builder, req_ctx)| {
self.handle_stream_request(req_ctx, handler_builder)
}); // Result<Stream<Resp, Error>, Error>

stream::once(result_of_stream) // Stream<Stream<Resp, Error>, Error>
.flatten() // Stream<Resp, Error>
.or_else(|e| Ok(make_error_response(e))) // Stream<Resp, ()>
}
}

Expand Down Expand Up @@ -676,6 +653,7 @@ mod tests {
Box::new(|_, _: &_| Ok(UnaryFixture::new(Ok(coppb::Response::new())).into_boxed()));
let resp = cop
.handle_unary_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.wait()
.unwrap();
assert!(resp.get_other_error().is_empty());
Expand All @@ -692,11 +670,11 @@ mod tests {
None,
None,
);
let resp = cop
assert!(cop
.handle_unary_request(outdated_req_ctx, handler_builder)
.unwrap()
.wait()
.unwrap();
assert_eq!(resp.get_other_error(), OUTDATED_ERROR_MSG);
.is_err());
}

#[test]
Expand Down Expand Up @@ -806,25 +784,28 @@ mod tests {
let handler_builder = Box::new(|_, _: &_| {
Ok(UnaryFixture::new_with_duration(Ok(response), 1000).into_boxed())
});
let future = cop.handle_unary_request(ReqContext::default_for_test(), handler_builder);
let tx = tx.clone();
thread::spawn(move || tx.send(future.wait().unwrap()));
let result_of_future =
cop.handle_unary_request(ReqContext::default_for_test(), handler_builder);
match result_of_future {
Err(full_error) => {
tx.send(Err(full_error)).unwrap();
}
Ok(future) => {
let tx = tx.clone();
thread::spawn(move || {
tx.send(future.wait()).unwrap();
});
}
}
thread::sleep(Duration::from_millis(100));
}

// verify
for _ in 2..5 {
let resp: coppb::Response = rx.recv().unwrap();
assert_eq!(resp.get_data().len(), 0);
assert!(resp.has_region_error());
assert!(resp.get_region_error().has_server_is_busy());
assert_eq!(
resp.get_region_error().get_server_is_busy().get_reason(),
BUSY_ERROR_MSG
);
assert!(rx.recv().unwrap().is_err());
}
for i in 0..2 {
let resp = rx.recv().unwrap();
let resp = rx.recv().unwrap().unwrap();
assert_eq!(resp.get_data(), [1, 2, i]);
assert!(!resp.has_region_error());
}
Expand All @@ -844,6 +825,7 @@ mod tests {
});
let resp = cop
.handle_unary_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.wait()
.unwrap();
assert_eq!(resp.get_data().len(), 0);
Expand All @@ -865,6 +847,7 @@ mod tests {
});
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -884,6 +867,7 @@ mod tests {
let handler_builder = Box::new(|_, _: &_| Ok(StreamFixture::new(responses).into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -907,6 +891,7 @@ mod tests {
let handler_builder = Box::new(|_, _: &_| Ok(StreamFixture::new(vec![]).into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand Down Expand Up @@ -942,6 +927,7 @@ mod tests {
let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -967,6 +953,7 @@ mod tests {
let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand All @@ -992,6 +979,7 @@ mod tests {
let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.collect()
.wait()
.unwrap();
Expand Down Expand Up @@ -1029,6 +1017,7 @@ mod tests {
let handler_builder = Box::new(move |_, _: &_| Ok(handler.into_boxed()));
let resp_vec = cop
.handle_stream_request(ReqContext::default_for_test(), handler_builder)
.unwrap()
.take(7)
.collect()
.wait()
Expand Down Expand Up @@ -1086,8 +1075,9 @@ mod tests {
)
.into_boxed())
});
let resp_future_1 =
cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_1 = cop
.handle_unary_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || sender.send(vec![resp_future_1.wait().unwrap()]).unwrap());
// Sleep a while to make sure that thread is spawn and snapshot is taken.
Expand All @@ -1100,8 +1090,9 @@ mod tests {
.into_boxed(),
)
});
let resp_future_2 =
cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_2 = cop
.handle_unary_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || sender.send(vec![resp_future_2.wait().unwrap()]).unwrap());
thread::sleep(Duration::from_millis(SNAPSHOT_DURATION_MS as u64));
Expand Down Expand Up @@ -1159,8 +1150,9 @@ mod tests {
)
.into_boxed())
});
let resp_future_1 =
cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_1 = cop
.handle_unary_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || sender.send(vec![resp_future_1.wait().unwrap()]).unwrap());
// Sleep a while to make sure that thread is spawn and snapshot is taken.
Expand All @@ -1182,8 +1174,9 @@ mod tests {
)
.into_boxed())
});
let resp_future_3 =
cop.handle_stream_request(req_with_exec_detail.clone(), handler_builder);
let resp_future_3 = cop
.handle_stream_request(req_with_exec_detail.clone(), handler_builder)
.unwrap();
let sender = tx.clone();
thread::spawn(move || {
sender
Expand Down
Loading