Skip to content

Commit

Permalink
handle uninitialized tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
Naor Malca committed Oct 26, 2022
1 parent 9d6ba63 commit 5bcdfb5
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,17 @@ def _trace_before_publish(self, *args, **kwargs):
task = utils.retrieve_task_from_sender(kwargs)
task_id = utils.retrieve_task_id_from_message(kwargs)

if task is None or task_id is None:
if task_id is None:
return

operation_name = f"{_TASK_APPLY_ASYNC}/{task.name}"
if task is None:
# task is an anonymous task send using send_task or using canvas workflow
# Signatures() to send to a task not in the current processes dependency
# tree
task_name = kwargs.get("sender", "unknown")
else:
task_name = task.name
operation_name = f"{_TASK_APPLY_ASYNC}/{task_name}"
span = self._tracer.start_span(
operation_name, kind=trace.SpanKind.PRODUCER
)
Expand All @@ -195,7 +202,7 @@ def _trace_before_publish(self, *args, **kwargs):
if span.is_recording():
span.set_attribute(_TASK_TAG_KEY, _TASK_APPLY_ASYNC)
span.set_attribute(SpanAttributes.MESSAGING_MESSAGE_ID, task_id)
span.set_attribute(_TASK_NAME_KEY, task.name)
span.set_attribute(_TASK_NAME_KEY, task_name)
utils.set_attributes_from_context(span, kwargs)

activation = trace.use_span(span, end_on_exit=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ def attach_span(task, task_id, span, is_publish=False):
NOTE: We cannot test for this well yet, because we do not run a celery worker,
and cannot run `task.apply_async()`
"""
if task is None:
return
span_dict = getattr(task, CTX_KEY, None)
if span_dict is None:
span_dict = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ def test_task(self):
},
)

self.assertEqual(
producer.name, "apply_async/tests.celery_test_tasks.task_add"
)
self.assertEqual(producer.name, "apply_async/tests.celery_test_tasks.task_add")
self.assertEqual(producer.kind, SpanKind.PRODUCER)
self.assertSpanHasAttributes(
producer,
Expand All @@ -77,3 +75,45 @@ def test_task(self):
self.assertNotEqual(consumer.parent, producer.context)
self.assertEqual(consumer.parent.span_id, producer.context.span_id)
self.assertEqual(consumer.context.trace_id, producer.context.trace_id)


class TestCelerySignatureTask(TestBase):
def setUp(self):
super().setUp()

def start_app(*args, **kwargs):
# Add an additional task that will not be registered with parent thread
@app.task
def hidden_task(x):
return x * 2

self._worker = app.Worker(app=app, pool="solo", concurrency=1)
return self._worker.start(*args, **kwargs)

self._thread = threading.Thread(target=start_app)
self._worker = app.Worker(app=app, pool="solo", concurrency=1)
self._thread.daemon = True
self._thread.start()

def tearDown(self):
super().tearDown()
self._worker.stop()
self._thread.join()

def test_hidden_task(self):
# no-op since already instrumented
CeleryInstrumentor().instrument()
import ipdb

ipdb.set_trace()

res = app.signature("app.hidden_task", (2,)).apply_async()
while not res.ready():
time.sleep(0.05)

spans = self.sorted_spans(self.memory_exporter.get_finished_spans())
self.assertEqual(len(spans), 1)

producer = spans

self.assertEqual(producer.name, "apply_async/app.hidden_task")
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,13 @@ def fn_task():
utils.detach_span(fn_task, task_id)
self.assertEqual(utils.retrieve_span(fn_task, task_id), (None, None))

def test_optional_task_span_attach(self):
task_id = "7c6731af-9533-40c3-83a9-25b58f0d837f"
span = trace._Span("name", mock.Mock(spec=trace_api.SpanContext))

# assert this is is a no-aop
self.assertIsNone(utils.attach_span(None, task_id, span))

def test_span_delete_empty(self):
# ensure detach_span doesn't raise an exception if span is not present
@self.app.task
Expand Down

0 comments on commit 5bcdfb5

Please sign in to comment.