-
Notifications
You must be signed in to change notification settings - Fork 6
/
example.py
57 lines (44 loc) · 1.28 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#!/usr/bin/env python3
import pytasking
import time
def hello(hello_queue):
while True:
hello_queue.put_nowait("Hello World!")
pytasking.sleep(1.5, sync=True)
async def ping():
while True:
try:
print("Ping!")
await pytasking.sleep(1.0)
print("Pong!")
except pytasking.asyncio.CancelledError:
print("Pang!")
break
async def main(task_manager):
hellos = 0
hello_queue = pytasking.multiprocessing.Queue()
hello_proc = task_manager.add_proc(hello, hello_queue)
while True:
try:
if hellos == 5:
task_manager.delete_proc(hello_proc)
if not hello_queue.empty():
try:
print(hello_queue.get_nowait())
hellos += 1
except:
pass
ping_task = task_manager.add_task(ping)
await pytasking.sleep(0.5)
task_manager.delete_task(ping_task)
except pytasking.asyncio.CancelledError:
break
if __name__ == "__main__":
task_manager = pytasking.Manager()
task_manager.add_task(main, task_manager)
try:
task_manager.start()
except KeyboardInterrupt:
pass
except:
raise