From bbf1ed8e392be2a68f697ce8b8d558acf71d68ad Mon Sep 17 00:00:00 2001 From: hanhxiao Date: Sun, 29 Sep 2019 18:59:02 +0800 Subject: [PATCH] feat(frontend): add max pending request to frontend --- gnes/service/frontend.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/gnes/service/frontend.py b/gnes/service/frontend.py index 142a7a1b..215f85ef 100644 --- a/gnes/service/frontend.py +++ b/gnes/service/frontend.py @@ -58,6 +58,7 @@ def __init__(self, args): check_version=self.args.check_version, timeout=self.args.timeout, squeeze_pb=self.args.squeeze_pb) + self.pending_request = 0 def add_envelope(self, body: 'gnes_pb2.Request', zmq_client: 'ZmqClient'): msg = gnes_pb2.Message() @@ -103,24 +104,27 @@ def Search(self, request, context): return self.Call(request, context) def StreamCall(self, request_iterator, context): + self.pending_request = 0 + + def get_response(num_recv, blocked=False): + for _ in range(num_recv): + if blocked or zmq_client.receiver.poll(1): + msg = zmq_client.recv_message(**self.send_recv_kwargs) + self.pending_request -= 1 + yield self.remove_envelope(msg) + with self.zmq_context as zmq_client: - pending_request = 0 for request in request_iterator: zmq_client.send_message(self.add_envelope(request, zmq_client), **self.send_recv_kwargs) - pending_request += 1 - - while pending_request > self.args.max_pending_request: - # too many pending requests, the whole network is pretty busy - # slow down the sending rate by waiting responses - if zmq_client.receiver.poll(1): - msg = zmq_client.recv_message(**self.send_recv_kwargs) - pending_request -= 1 - yield self.remove_envelope(msg) - - for _ in range(pending_request): - msg = zmq_client.recv_message(**self.send_recv_kwargs) - yield self.remove_envelope(msg) + self.pending_request += 1 + + num_recv = max(self.pending_request - self.args.max_pending_request, 1) + + # switch to blocked recv when too many pending requests + yield from get_response(num_recv, num_recv > 1) + + yield from get_response(self.pending_request, blocked=True) class ZmqContext: """The zmq context class."""