This repository has been archived by the owner on May 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 161
/
test_threads.py
56 lines (40 loc) · 1.84 KB
/
test_threads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from __future__ import print_function
from concurrent.futures import ThreadPoolExecutor
from opentracing.mocktracer import MockTracer
from ..testcase import OpenTracingTestCase
from ..utils import await_until
class TestThreads(OpenTracingTestCase):
def setUp(self):
self.tracer = MockTracer()
self.executor = ThreadPoolExecutor(max_workers=3)
def tearDown(self):
self.executor.shutdown(False)
def test_main(self):
# Start a Span and let the callback-chain
# finish it when the task is done
with self.tracer.start_active_span('one', finish_on_close=False):
self.submit()
# Cannot shutdown the executor and wait for the callbacks
# to be run, as in such case only the first will be executed,
# and the rest will get canceled.
await_until(lambda: len(self.tracer.finished_spans()) == 1, 5)
spans = self.tracer.finished_spans()
self.assertEqual(len(spans), 1)
self.assertEqual(spans[0].operation_name, 'one')
for i in range(1, 4):
self.assertEqual(spans[0].tags.get('key%s' % i, None), str(i))
def submit(self):
span = self.tracer.scope_manager.active.span
def task1():
with self.tracer.scope_manager.activate(span, False):
span.set_tag('key1', '1')
def task2():
with self.tracer.scope_manager.activate(span, False):
span.set_tag('key2', '2')
def task3():
with self.tracer.scope_manager.activate(span,
True):
span.set_tag('key3', '3')
self.executor.submit(task3)
self.executor.submit(task2)
self.executor.submit(task1)