16
16
from array import array
17
17
from collections import deque
18
18
from concurrent .futures import Executor , Future , ThreadPoolExecutor
19
+ from contextvars import ContextVar
19
20
from time import sleep
20
21
from unittest import mock
21
22
@@ -97,35 +98,37 @@ async def inc(x):
97
98
assert end - start < 10
98
99
99
100
101
+ def test_sync (loop_in_thread ):
102
+ async def f (x , y ):
103
+ await asyncio .sleep (0.01 )
104
+ return x , y
105
+
106
+ result = sync (loop_in_thread , f , 1 , y = 2 )
107
+ assert result == (1 , 2 )
108
+
109
+
100
110
def test_sync_error (loop_in_thread ):
101
- loop = loop_in_thread
102
- try :
103
- result = sync (loop , throws , 1 )
104
- except Exception as exc :
105
- f = exc
106
- assert "hello" in str (exc )
107
- tb = get_traceback ()
108
- L = traceback .format_tb (tb )
109
- assert any ("throws" in line for line in L )
111
+ with pytest .raises (RuntimeError , match = "hello!" ) as exc :
112
+ sync (loop_in_thread , throws , 1 )
113
+
114
+ L = traceback .format_tb (exc .value .__traceback__ )
115
+ assert any ("throws" in line for line in L )
110
116
111
117
def function1 (x ):
112
118
return function2 (x )
113
119
114
120
def function2 (x ):
115
121
return throws (x )
116
122
117
- try :
118
- result = sync (loop , function1 , 1 )
119
- except Exception as exc :
120
- assert "hello" in str (exc )
121
- tb = get_traceback ()
122
- L = traceback .format_tb (tb )
123
- assert any ("function1" in line for line in L )
124
- assert any ("function2" in line for line in L )
123
+ with pytest .raises (RuntimeError , match = "hello!" ) as exc :
124
+ sync (loop_in_thread , function1 , 1 )
125
+
126
+ L = traceback .format_tb (exc .value .__traceback__ )
127
+ assert any ("function1" in line for line in L )
128
+ assert any ("function2" in line for line in L )
125
129
126
130
127
131
def test_sync_timeout (loop_in_thread ):
128
- loop = loop_in_thread
129
132
with pytest .raises (TimeoutError ):
130
133
sync (loop_in_thread , asyncio .sleep , 0.5 , callback_timeout = 0.05 )
131
134
@@ -145,6 +148,21 @@ async def get_loop():
145
148
exc_info .match ("IOLoop is clos(ed|ing)" )
146
149
147
150
151
+ def test_sync_contextvars (loop_in_thread ):
152
+ """Test that sync() propagates contextvars - namely,
153
+ distributed.metrics.context_meter callbacks
154
+ """
155
+ v = ContextVar ("v" , default = 0 )
156
+
157
+ async def f ():
158
+ return v .get ()
159
+
160
+ assert sync (loop_in_thread , f ) == 0
161
+ tok = v .set (1 )
162
+ assert sync (loop_in_thread , f ) == 1
163
+ v .reset (tok )
164
+
165
+
148
166
def test_is_kernel ():
149
167
pytest .importorskip ("IPython" )
150
168
assert is_kernel () is False
0 commit comments