-
Notifications
You must be signed in to change notification settings - Fork 10.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
python grpc server with multiprocessing fails #16001
Comments
@ericgribkoff is this a supported use-case? |
No, this is not supported.
@p1c2u Thanks for pointing this out. I see that our documentation could be a bit confusing if you're looking at using fork with gRPC servers: we should update that document to make it more clear that the fork-support there is for client-side usage. (cc @kpayson64) |
[PYTHON] @ericgribkoff , I have ran into a use case where i am using a gRPC for a server side CPU bound task (and need to use python multi processing as there isn't an option in pure Python). Can you please suggest a solution here as i see no support for server side for fork ? I am facing @evanj How did you deal with #15334 (comment) ? |
@ericgribkoff , @evanj : I'm facing the same issue as @mohit-chawla |
+1 |
The following two approaches should help with combining gRPC Python servers and the Option 1: If your environment supports the SO_REUSEPORT socket option, you can run multiple copies of your gRPC server in individual processes started via This type of pre-fork + SO_REUSEPORT would look something like the following:
Option 2: Run a single gRPC Python server, but offload all CPU-intensive work to a This would look like the following:
|
Did this behaviour change recently? I have a grpc server invoking tensorflow, but tensorflow doesn't have anyway to explicitly let go of GPU memory. Thus my solution, when receiving a grpc request, was to spawn a new process to do the work, then let it terminate after the work was done. I was pretty sure I had this working with grpc earlier in 2018, but now I get (I can work around it in an awkward way, but I'd like to clarify if I'm going mad) |
@ferrouswheel gRPC servers using However, if you were spawning a new process via |
@ericgribkoff Thank you for the clarification. I must have just got lucky with a prior combination of factors. |
@ericgribkoff Do you mean that if we build a executor that preforks before grpc server starts, then it would work? |
@yifeikong Not exactly: you will need to prefork before any calls into gRPC, including instantiation of the server. |
@ericgribkoff any chance you can flesh out a more concrete example for multiprocessing.Pool? I'm not seeing how one could offload incoming calls to the pool (i.e. in your example, how would |
@jshlbrd I think Eric's option 1 answers your question better. You start gRPC server in the process worker function, and the incoming traffic will be automatically distributed due to |
@lidizheng -- thanks, yeah that's what I thought, and I was able to get something like that working for my application. However, that spins up a gRPC server in each child process, I was curious if it's possible to run one gRPC server from the main process and distribute the calls to a group of child processes. My main use for this is to support use of signals -- you can't use signals from within threads, but you can from within processes. |
@jshlbrd Unfortunately, the fork type you are talking about is currently not supported for gRPC Python server. The fork support currently only available for gRPC Python client, and it is actually harder than people expected. We are using C extension and perform a lot of IO without GIL with multiple threads with different responsibilities, and it is challenging to prevent CPython from dead locking itself. |
It's worth noting that our |
I'd like to share this as a proposed workaround for this problem and seek feedback on this technique. Here's a proof-of-concept for how I've got this working (warning, some pseudo-code is present here): from concurrent import futures
from concurrent.futures import TimeoutError
import grpc
import pebble
def make_widget(i):
return i * 2
class WidgetServicer(widget_pb2_grpc.WidgetServicer):
def __init__(self, pool):
self.pool = pool
def WidgetRequest(self, request, context):
response = widget_pb2.Response()
future = self.pool.schedule(make_widget, args=[request.Value], timeout=context.time_remaining())
try:
response.Widget = future.result()
except TimeoutError:
context.abort(grpc.StatusCode.DEADLINE_EXCEEDED, 'Timeout') # this handles process timeout
except pebble.common.ProcessExpired:
context.abort(grpc.StatusCode.RESOURCE_EXHAUSTED, 'Abnormal termination') # this handles process crashes
return response
def main():
pool = pebble.ProcessPool(max_workers=processes, max_tasks=tasks)
executor = futures.ThreadPoolExecutor(max_workers=processes)
server = grpc.server(executor)
servicer = WidgetServicer(pool)
<start your server and block forever> Here's how I think this is working: when you set both the ProcessPool and ThreadPoolExecutor to the same count, it locks the gRPC servicer into only feeding requests to the number child processes that are available. If you were to increase the ThreadPoolExecutor count, then requests would queue in the ProcessPool creating a backlog of work that may never be delivered upon; keeping this count the same, the gRPC servicer will maintain control over the life of the request and never queue them in the ProcessPool (if the count of requests exhausts the number of available processes, then eventually the client would get a timeout). There's also some error checking for timeouts and process crashes in the ProcessPool. |
@jshlbrd That's exactly right. To be precise, your maximum concurrency is Generally, you'll want the process count to match your processor count and you'll want your thread count to be equal to or higher than that (but not too much higher or you'll get thrashing from context switching). This will buy you the maximum possible concurrency. It's worth noting that if the work done in your handler releases the GIL, (i.e. by using a C extension like |
Woohoo, thanks for the feedback! You're correct with regard to possibly not needing subprocesses. In my case, the tasks being performed by the gRPC server are CPU-intensive and can range in time from sub-second to several minutes -- that's why I need to use subprocesses that can be timed out. |
Hopefully, this example will serve as a good reference for anyone who runs across this in the future. |
That’s a good example of utilizing multiple cores when your server isn’t running CPU-intensive tasks, but I think it still may not address the need to natively support process pools and thread pools. For example, when using that example as a template for CPU-intensive tasks, you could reduce the number of threads inside each gRPC servicer to one, but because each sub process is running a different server, they aren’t working from a shared RPC queue and (IIRC) reuseport appears to randomly select one of the connected servers — depending on your application, this can lead to uneven balancing across cores. Probably goes without saying that no one should think of reuseport as anything more than a simple load balancer, but the value of a single RPC queue is lost in that example. Edit: Reading into reuseport more, it uses IP/port hashing to distribute packets to the servers. I’m curious if that means that any of these gRPC servers behind a load balancer (like Envoy) would lead to uneven distribution. I haven’t tested this, but it’s leading me to believe that a more stable solution may be to use a local load balancer and spin up multiple gRPC servers on different ports? |
Based on more testing, I'm now convinced that pre-forking via subprocess pools isn't a viable method for handling this (too many failure scenarios to deal with). Here's a summary of techniques that can work with their pitfalls: Pools (including multiprocessing.Pool, billiard.Pool): Processes w/ SO_REUSEPORT: Processes w/o SO_REUSEPORT: |
@gnossen any feedback on using the file system to communicate tasks between gRPC threads and subprocesses? I don't know the gRPC internals well enough to know if post-forking subprocesses that do not directly interact with the gRPC service would cause problems in this scenario. Here's some pseudo-code ... TMP_REQ = '/path/to/your/tmp/requests/'
TMP_RESP = '/path/to/your/tmp/responses/'
class WidgetServicer(widget_pb2_grpc.WidgetServicer):
def __init__(self, queue):
self.queue = queue
def StreamWidget(self, request_iterator, context):
resp = widget_pb2.Response()
uid = uuid.uuid4().hex
tmp_req = os.path.join(TMP_REQ, uid)
tmp_resp = os.path.join(TMP_RESP, uid)
with open(tmp_req, 'wb') as f:
for request in request_iterator:
f.write(request.widget)
self.queue.put(
{'tmp_req': tmp_req,
'tmp_resp': tmp_resp,
'timeout': context.time_remaining()},
)
while context.is_active():
if os.path.isfile(tmp_resp):
with open(tmp_resp) as f:
resp.widget = f.read()
os.remove(tmp_resp)
return resp
time.sleep(0.1)
run = 1
def main():
def handler(sig, frame):
global run
run = 0
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
q = multiprocessing.Queue()
workers = []
for _ in range(4):
p = Worker(q) # multiprocessing.process or some other kind of subproc
p.start()
workers.append(p)
executor = futures.ThreadPoolExecutor(max_workers=8)
server = grpc.server(executor,
maximum_concurrent_rpcs=100)
widget_pb2_grpc.add_WidgetServicer_to_server(WidgetServicer(q), server)
server.add_insecure_port('127.0.0.1:8443')
server.start()
while run:
for p in list(workers):
if not p.is_alive():
p.join()
workers.remove(p)
p = Worker(q) # multiprocessing.process or some other kind of subproc
p.start()
workers.append(p)
time.sleep(5)
stop = server.stop(10)
stop.wait()
for p in list(workers):
p.shutdown()
p.join() I have code that follows this pattern that appears to work, but I'd like to get some confirmation that this design doesn't cause issues with the gRPC internals. |
@jshlbrd Communicating through files will result in race condition unless you are using RWLock or similar synchronization mechanism. In another word, you may read incomplete data. Also, if you want to start new Worker in the second half, using PS. Couple days ago, @gnossen confirmed that SO_REUSEPORT is not evenly distributing the workload across processes. Thank you for pointing that out. |
Thanks for the info! I think for my app synchronization is handled by
sequencing, but it’d be helpful if you could double check my logic here ...
the request is written to a unique file by the RPC, a pointer to that (now
closed) request file is placed in a multiprocessing queue, and the RPC sits
in a while loop waiting for a response file to be written; at the same
time, worker processes run continuously trying to pick tasks off the queue,
they pick a task, read/delete the request file, process the task, and write
the result to the response file pre-determined by the RPC. After the RPC
reads the response file, it deletes it. The RPC and workers should never
touch the files while each are being operated over.
The risk that needs to be accounted for is what happens when an RPC times
out — this could leave dangling files. However, that could be handled by a
separate process (e.g. delete any files that haven’t been modified for N
seconds/minutes).
Convoluted, but it can work if the gRPC servicer is OK with processes
spawning after the service is started — that’s my main concern. The only
direct communication between the gRPC threads and processes are via the
queue.
Update: Thought about this a bit more and I suspect that using Redis might be a better option than the file system -- all operations are atomic so there's no risk of race conditions if the proper commands are used.
…On Thu, Mar 21, 2019 at 6:19 PM Lidi Zheng ***@***.***> wrote:
@jshlbrd <https://github.com/jshlbrd> Communicating through files will
result in race condition unless you are using RWLock or similar
synchronization mechanism. In another word, you may read incomplete data.
Also, if you want to start new Worker in the second half, using subprocess
might be a better choice.
PS. Couple days ago, @gnossen <https://github.com/gnossen> confirmed that
SO_REUSEPORT is not evenly distributing the workload across processes.
Thank you for pointing that out.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#16001 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AFcmWN52VILYCH7HY4OsPZzQ6DMtmphjks5vZC-vgaJpZM4VMnia>
.
|
Apologies for spamming the issue, but here's a more complete example of what I described in the previous message. All of these Redis commands are atomic, which enables synchronization. Things become more complicated (but not impossible) when dealing with streaming RPCs. class Worker(multiprocessing.process): # or use some other kind of subproc
def __init__(self):
super().__init__()
# used as the task queue
self.r0 = redis.StrictRedis(host='localhost', port=6379, db=0)
# used to temporarily store requests
self.r1 = redis.StrictRedis(host='localhost', port=6379, db=1)
# used to temporarily store results
self.r2 = redis.StrictRedis(host='localhost', port=6379, db=2)
def run(self):
while 1:
task = self.r0.blpop('queue', timeout=1) # wait forever w/ timeout=0
if task:
loaded_task = json.loads(task[1])
try:
with interruptingcow.timeout(loaded_task['timeout'], RuntimeError): # interruptingcow is a convenient package for timing out operations, but you could substitute signal too
request = self.r1.get(loaded_task['uid'])
result = process(request) # do something with the request
self.r2.setex(task['uid'], 30, result)
except RuntimeError:
print('RPC timed out')
class WidgetServicer(widget_pb2_grpc.WidgetServicer):
def __init__(self):
# used as a task queue
self.r0 = redis.StrictRedis(host='localhost', port=6379, db=0)
# used to temporarily store requests
self.r1 = redis.StrictRedis(host='localhost', port=6379, db=1)
# used to temporarily store results
self.r2 = redis.StrictRedis(host='localhost', port=6379, db=2)
def UnaryWidget(self, request, context):
resp = widget_pb2.Response()
uid = uuid.uuid4().hex
task = {
'uid': uid,
'...': {}, # add any additional data provided by request to the task
}
self.r1.setex(uid, 30, request.widget)
task['timeout'] = context.time_remaining()
self.r0.rpush('queue', json.dumps(task))
while context.is_active():
result = self.r2.get(uid)
if result is not None:
resp.widget = result
break
time.sleep(0.1)
return resp
run = 1
def main():
def handler(sig, frame):
global run
run = 0
signal.signal(signal.SIGTERM, handler)
signal.signal(signal.SIGINT, handler)
workers = []
for _ in range(4):
p = Worker() # multiprocessing.process or some other kind of subproc
p.start()
workers.append(p)
executor = futures.ThreadPoolExecutor(max_workers=4)
server = grpc.server(executor,
maximum_concurrent_rpcs=100)
widget_pb2_grpc.add_WidgetServicer_to_server(WidgetServicer(), server)
server.add_insecure_port('127.0.0.1:8443') # please don't do this in production
server.start()
while run:
for p in list(workers):
if not p.is_alive():
p.join()
workers.remove(p)
p = Worker() # multiprocessing.process or some other kind of subproc
p.start()
workers.append(p)
time.sleep(5)
stop = server.stop(10)
stop.wait()
for p in list(workers):
p.shutdown()
p.join() There's some flexibility here with which Redis commands are used (e.g. I am explicitly not deleting keys from the Redis databases and instead letting the expiration time clean everything up) and a simple service would likely only need to utilize two Redis databases. I think this might be the only solution proposed so far that lets you safely integrate rotating subprocesses with a gRPC servicer that uses a single bound network address? |
This issue/PR has been automatically marked as stale because it has not had any update (including commits, comments, labels, milestones, etc) for 180 days. It will be closed automatically if no further update occurs in 1 day. Thank you for your contributions! |
What version of gRPC and what language are you using?
grpc 1.13.0 with python 3.6.5
What operating system (Linux, Windows, …) and version?
CentOS 7 (Linux 3.10.0-862.3.2.el7.x86_64)
What runtime / compiler are you using (e.g. python version or version of gcc)
CPython 3.6.5
What did you do?
What did you expect to see?
Based on doc https://github.com/grpc/grpc/blob/master/doc/fork_support.md
gRPC server should run on 4 processes
What did you see instead?
Server crash
Make sure you include information that can help us debug (full error message, exception listing, stack trace, logs).
Anything else we should know about your project / environment?
The text was updated successfully, but these errors were encountered: