Skip to content

Commit

Permalink
do some more conversion from unittest to pytest syntax
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed Aug 17, 2023
1 parent 2fd01d3 commit 645a444
Showing 1 changed file with 29 additions and 30 deletions.
59 changes: 29 additions & 30 deletions tests/integration/test_celeryadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_build_set_of_queues(self, launch_workers, worker_queue_map):
result = celeryadapter.build_set_of_queues(
steps=["all"], spec=None, specific_queues=None, verbose=False, app=celery_app
)
self.assertEqual(result, set(worker_queue_map.values()))
assert result == set(worker_queue_map.values())

def test_query_celery_queues(self, launch_workers):
"""
Expand All @@ -79,7 +79,7 @@ def test_get_running_queues(self, launch_workers, worker_queue_map):
This should return a list of active queues.
"""
result = celeryadapter.get_running_queues("celery_app")
self.assertEqual(sorted(result), sorted(list(worker_queue_map.values())))
assert sorted(result) sorted(list(worker_queue_map.values()))

def test_get_queues_active(self, launch_workers, worker_queue_map):
"""
Expand All @@ -91,14 +91,13 @@ def test_get_queues_active(self, launch_workers, worker_queue_map):
queue_result, worker_result = celeryadapter.get_queues(celery_app)

# Ensure we got output before looping
self.assertEqual(len(queue_result), 3)
self.assertEqual(len(worker_result), 3)
assert len(queue_result) == len(worker_result) == 3

for worker, queue in worker_queue_map.items():
# Check that the entry in the queue_result dict for this queue is correct
self.assertIn(queue, queue_result)
self.assertEqual(len(queue_result[queue]), 1)
self.assertIn(worker, queue_result[queue][0])
assert queue in queue_result
assert len(queue_result[queue]) == 1
assert worker in queue_result[queue][0]

# Remove this entry from the queue_result dict
del queue_result[queue]
Expand All @@ -113,8 +112,8 @@ def test_get_queues_active(self, launch_workers, worker_queue_map):
assert worker_found

# Ensure there was no extra output that we weren't expecting
self.assertEqual(queue_result, {})
self.assertEqual(worker_result, [])
assert queue_result == {}
assert worker_result == []


class TestInactiveQueues:
Expand All @@ -123,17 +122,17 @@ class TestInactiveQueues:
It will run tests where we don't need any active queues to interact with.
"""

def test_build_set_of_queues(self):
def test_build_set_of_queues(self, celery_app):
"""
Test the build_set_of_queues function with no queues active.
This should return an empty set.
"""
result = celeryadapter.build_set_of_queues(
steps=["all"], spec=None, specific_queues=None, verbose=False, app=celery_app
)
self.assertEqual(result, set())
assert result == set()

def test_build_set_of_queues_with_spec(self):
def test_build_set_of_queues_with_spec(self, celery_app):
"""
Test the build_set_of_queues function with a spec provided as input.
This should return a set of the queues defined in the spec file.
Expand All @@ -159,9 +158,9 @@ def test_build_set_of_queues_with_spec(self):
for i, output in enumerate(expected_output):
expected_output[i] = f"{merlin_tag}{output}"

self.assertEqual(result, set(expected_output))
assert result == set(expected_output)

def test_build_set_of_queues_with_specific_queues(self):
def test_build_set_of_queues_with_specific_queues(self, celery_app):
"""
Test the build_set_of_queues function with specific queues provided as input.
This should return a set of all the queues listed in the specific_queues argument.
Expand All @@ -174,9 +173,9 @@ def test_build_set_of_queues_with_specific_queues(self):
steps=["all"], spec=None, specific_queues=specific_queues, verbose=False, app=celery_app
)

self.assertEqual(result, set(specific_queues))
assert result == set(specific_queues)

def test_build_set_of_queues_with_specific_queues_and_spec(self):
def test_build_set_of_queues_with_specific_queues_and_spec(self, celery_app):
"""
Test the build_set_of_queues function with specific queues and a yaml spec provided as input.
The specific queues provided here will have a mix of queues that exist in the spec and
Expand All @@ -198,9 +197,9 @@ def test_build_set_of_queues_with_specific_queues_and_spec(self):
# Build the expected output list
expected_output = [f"[merlin]_{queue}" for queue in valid_queues]

self.assertEqual(result, set(expected_output))
assert result == set(expected_output)

def test_build_set_of_queues_with_steps_and_spec(self):
def test_build_set_of_queues_with_steps_and_spec(self, celery_app):
"""
Test the build_set_of_queues function with steps and a yaml spec provided as input.
This should return the queues associated with the steps that we provide.
Expand All @@ -220,7 +219,7 @@ def test_build_set_of_queues_with_steps_and_spec(self):
for i, output in enumerate(expected_output):
expected_output[i] = f"{merlin_tag}{output}"

self.assertEqual(result, set(expected_output))
assert result == set(expected_output)

def test_query_celery_queues(self):
"""
Expand Down Expand Up @@ -252,7 +251,7 @@ def test_build_csv_queue_info(self, worker_queue_map):

# Run the test
result = celeryadapter.build_csv_queue_info(query_return, date)
self.assertEqual(result, expected_output)
assert result == expected_output

def test_build_json_queue_info(self, worker_queue_map):
"""
Expand All @@ -274,7 +273,7 @@ def test_build_json_queue_info(self, worker_queue_map):

# Run the test
result = celeryadapter.build_json_queue_info(query_return, date)
self.assertEqual(result, expected_output)
assert result == expected_output

def test_dump_celery_queue_info_csv(self, worker_queue_map):
"""
Expand Down Expand Up @@ -303,18 +302,18 @@ def test_dump_celery_queue_info_csv(self, worker_queue_map):
with open(outfile, "r") as csv_df:
csv_dump_data = csv.DictReader(csv_df)
# Make sure a timestamp field was created
self.assertIn("time", csv_dump_data.fieldnames)
assert "time" in csv_dump_data.fieldnames

# Format the csv data that we just read in
csv_dump_output = _format_csv_data(csv_dump_data)

# We did one dump so we should only have 1 timestamp; we don't care about the value
self.assertEqual(len(csv_dump_output["time"]), 1)
assert len(csv_dump_output["time"]) == 1
del csv_dump_output["time"]

# Make sure the rest of the csv file was created as expected
dump_diff = DeepDiff(csv_dump_output, expected_output)
self.assertEqual(dump_diff, {})
assert dump_diff == {}
finally:
try:
os.remove(outfile)
Expand Down Expand Up @@ -350,7 +349,7 @@ def test_dump_celery_queue_info_json(self, worker_queue_map):
# There should only be one entry in the json dump file so this will only 'loop' once
for dump_entry in json_df_contents.values():
json_dump_diff = DeepDiff(dump_entry, expected_output)
self.assertEqual(json_dump_diff, {})
assert json_dump_diff == {}
finally:
try:
os.remove(outfile)
Expand All @@ -372,22 +371,22 @@ def test_celerize_queues(self, worker_queue_map):

# Ensure the queue tag was added to every queue
for queue in queues_to_check:
self.assertIn(queue_tag, queue)
assert queue_tag in queue

def test_get_running_queues(self):
"""
Test the get_running_queues function with no queues active.
This should return an empty list.
"""
result = celeryadapter.get_running_queues("celery_app")
self.assertEqual(result, [])
assert result == []

def test_get_queues(self):
def test_get_queues(self, celery_app):
"""
Test the get_queues function with no queues active.
This should return a tuple where the first entry is an empty dict
and the second entry is an empty list.
"""
queue_result, worker_result = celeryadapter.get_queues(celery_app)
self.assertEqual(queue_result, {})
self.assertEqual(worker_result, [])
assert queue_result == {}
assert worker_result == []

0 comments on commit 645a444

Please sign in to comment.