Skip to content

Commit

Permalink
fix: 🐛 fix the query to get the list of jobs in the queue (#271)
Browse files Browse the repository at this point in the history
* fix: 🐛 fix the query to get the list of jobs in the queue

we did a lot of unnecessary lookup.

* feat: 🎸 upgrade dependencies
  • Loading branch information
severo authored May 16, 2022
1 parent f071f42 commit 7ae7942
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 100 deletions.
8 changes: 6 additions & 2 deletions libs/libqueue/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@ include ../../tools/Common.mk

.PHONY: test
test:
MONGO_QUEUE_DATABASE="datasets_server_queue_test" poetry run python -m pytest -x tests
docker-compose -f tests/docker-compose.yml up -d --remove-orphans
MONGO_URL="mongodb://localhost:27020" MONGO_QUEUE_DATABASE="datasets_server_queue_test" poetry run python -m pytest -x tests
docker-compose -f tests/docker-compose.yml down

.PHONY: coverage
coverage:
MONGO_QUEUE_DATABASE="datasets_server_queue_test" poetry run python -m pytest -s --cov --cov-report xml:coverage.xml --cov-report=term tests
docker-compose -f tests/docker-compose.yml up -d --remove-orphans
MONGO_URL="mongodb://localhost:27020" MONGO_QUEUE_DATABASE="datasets_server_queue_test" poetry run python -m pytest -s --cov --cov-report xml:coverage.xml --cov-report=term tests
docker-compose -f tests/docker-compose.yml down
35 changes: 20 additions & 15 deletions libs/libqueue/src/libqueue/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,28 @@ def get_finished(jobs: QuerySet[AnyJob]) -> QuerySet[AnyJob]:
return jobs(status__nin=[Status.WAITING, Status.STARTED])


def get_excluded_dataset_names(jobs: QuerySet[AnyJob], max_jobs_per_dataset: Optional[int] = None) -> List[str]:
if max_jobs_per_dataset is None:
return []
dataset_names = [job.dataset_name for job in jobs(status=Status.STARTED).only("dataset_name")]
return list(
{dataset_name for dataset_name in dataset_names if dataset_names.count(dataset_name) >= max_jobs_per_dataset}
)


def start_job(jobs: QuerySet[AnyJob], max_jobs_per_dataset: Optional[int] = None) -> AnyJob:
waiting_jobs = get_waiting(jobs).order_by("+created_at").no_cache()
excluded_dataset_names = get_excluded_dataset_names(jobs, max_jobs_per_dataset)
next_waiting_job = (
jobs(status=Status.WAITING, dataset_name__nin=excluded_dataset_names)
.order_by("+created_at")
.no_cache()
.first()
)
# ^ no_cache should generate a query on every iteration, which should solve concurrency issues between workers
for job in waiting_jobs:
if job.status is not Status.WAITING:
logger.warning(f"waiting job {job.to_id()} has a not the WAITING status. Ignoring it.")
continue
if job.started_at is not None:
logger.warning(f"waiting job {job.to_id()} has a non empty started_at field. Ignoring it.")
continue
if job.finished_at is not None:
logger.warning(f"waiting job {job.to_id()} has a non empty started_at field. Ignoring it.")
continue
if max_jobs_per_dataset is None or get_num_started_for_dataset(jobs, job.dataset_name) < max_jobs_per_dataset:
job.update(started_at=datetime.utcnow(), status=Status.STARTED)
return job
raise EmptyQueue(f"no job available (within the limit of {max_jobs_per_dataset} started jobs per dataset)")
if next_waiting_job is None:
raise EmptyQueue("no job available (within the limit of {max_jobs_per_dataset} started jobs per dataset)")
next_waiting_job.update(started_at=datetime.utcnow(), status=Status.STARTED)
return next_waiting_job


def get_dataset_job(max_jobs_per_dataset: Optional[int] = None) -> Tuple[str, str]:
Expand Down
10 changes: 10 additions & 0 deletions libs/libqueue/tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: "3.9"
services:
mongodb-test-libqueue:
image: mongo
volumes:
- mongo-test-libqueue:/data/db:rw
ports:
- 27020:27017
volumes:
mongo-test-libqueue:
19 changes: 19 additions & 0 deletions libs/libqueue/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
connect_to_queue,
finish_dataset_job,
get_dataset_job,
get_split_job,
is_dataset_in_queue,
is_split_in_queue,
)
Expand Down Expand Up @@ -50,6 +51,24 @@ def test_add_job() -> None:
finish_dataset_job(job_id, success=True)


def test_max_jobs_per_dataset() -> None:
add_split_job("dataset", "config", "split1")
add_split_job("dataset", "config", "split2")
add_split_job("dataset", "config", "split3")
_, dataset_name, config_name, split_name = get_split_job()
assert dataset_name == "dataset"
assert config_name == "config"
assert split_name == "split1"
with pytest.raises(EmptyQueue):
get_split_job(0)
with pytest.raises(EmptyQueue):
get_split_job(1)
_, dataset_name, config_name, split_name = get_split_job(2)
assert split_name == "split2"
with pytest.raises(EmptyQueue):
get_split_job(2)


def test_is_dataset_in_queue() -> None:
dataset_name = "test_dataset"
dataset_name_2 = "test_dataset_2"
Expand Down
51 changes: 26 additions & 25 deletions services/api/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 7ae7942

Please sign in to comment.