forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.py
76 lines (61 loc) · 2.57 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# Init gevent
from gevent import monkey
monkey.patch_all()
import asyncio
import logging
import signal
import gevent
from temporalio.client import Client
from temporalio.worker import Worker
from gevent_async import activity, workflow
from gevent_async.executor import GeventExecutor
def main():
logging.basicConfig(level=logging.INFO)
# Create single-worker gevent executor and run asyncio.run(async_main()) in
# it, waiting for result. This executor cannot be used for anything else in
# Temporal, it is just a single thread for running asyncio. This means that
# inside of async_main we must create another executor specifically for
# executing activity and workflow tasks.
with GeventExecutor(max_workers=1) as executor:
executor.submit(asyncio.run, async_main()).result()
async def async_main():
# Create ctrl+c handler. We do this by telling gevent on SIGINT to set the
# asyncio event. But asyncio calls are not thread safe, so we have to invoke
# it via call_soon_threadsafe.
interrupt_event = asyncio.Event()
gevent.signal_handler(
signal.SIGINT,
asyncio.get_running_loop().call_soon_threadsafe,
interrupt_event.set,
)
# Connect client
client = await Client.connect("localhost:7233")
# Create an executor for use by Temporal. This cannot be the outer one
# running this async main. The max_workers here needs to have enough room to
# support the max concurrent activities/workflows settings.
with GeventExecutor(max_workers=200) as executor:
# Run a worker for the workflow and activities
async with Worker(
client,
task_queue="gevent_async-task-queue",
workflows=[workflow.GreetingWorkflow],
activities=[
activity.compose_greeting_async,
activity.compose_greeting_sync,
],
# Set the executor for activities (only used for non-async
# activities) and workflow tasks
activity_executor=executor,
workflow_task_executor=executor,
# Set the max concurrent activities/workflows. These are the same as
# the defaults, but this makes it clear that the 100 + 100 = 200 for
# max_workers settings.
max_concurrent_activities=100,
max_concurrent_workflow_tasks=100,
):
# Wait until interrupted
logging.info("Worker started, ctrl+c to exit")
await interrupt_event.wait()
logging.info("Shutting down")
if __name__ == "__main__":
main()