Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heartbeat #824

Merged
merged 12 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions libs/libcommon/src/libcommon/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __get__(self, instance: object, cls: Type[U]) -> QuerySet[U]:
# END monkey patching ### hack ###


class Status(enum.Enum):
class Status(str, enum.Enum):
WAITING = "waiting"
STARTED = "started"
SUCCESS = "success"
Expand All @@ -46,7 +46,7 @@ class Status(enum.Enum):
SKIPPED = "skipped"


class Priority(enum.Enum):
class Priority(str, enum.Enum):
NORMAL = "normal"
LOW = "low"

Expand All @@ -64,6 +64,7 @@ class JobDict(TypedDict):
created_at: datetime
started_at: Optional[datetime]
finished_at: Optional[datetime]
last_heartbeat: Optional[datetime]


class JobInfo(TypedDict):
Expand Down Expand Up @@ -121,6 +122,7 @@ class Job(Document):
created_at (`datetime`): The creation date of the job.
started_at (`datetime`, optional): When the job has started.
finished_at (`datetime`, optional): When the job has finished.
last_heartbeat (`datetime`, optional): Last time the running job got a heartbeat from the worker.
"""

meta = {
Expand Down Expand Up @@ -148,6 +150,7 @@ class Job(Document):
created_at = DateTimeField(required=True)
started_at = DateTimeField()
finished_at = DateTimeField()
last_heartbeat = DateTimeField()
AndreaFrancis marked this conversation as resolved.
Show resolved Hide resolved

def to_dict(self) -> JobDict:
return {
Expand All @@ -163,6 +166,7 @@ def to_dict(self) -> JobDict:
"created_at": self.created_at,
"started_at": self.started_at,
"finished_at": self.finished_at,
"last_heartbeat": self.last_heartbeat,
}

objects = QuerySetManager["Job"]()
Expand Down
6 changes: 4 additions & 2 deletions services/worker/dev.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ COPY services/worker/poetry.lock ./services/worker/poetry.lock
COPY services/worker/pyproject.toml ./services/worker/pyproject.toml
COPY libs/libcommon ./libs/libcommon
WORKDIR /src/services/worker/
RUN poetry install --no-cache
RUN --mount=type=cache,target=/home/.cache/pypoetry/cache \
--mount=type=cache,target=/home/.cache/pypoetry/artifacts \
poetry install --no-root

# FOR LOCAL DEVELOPMENT ENVIRONMENT
# No need to copy the source code since we map a volume in docker-compose-base.yaml
# Removed: COPY services/worker/src ./src
# Removed: RUN poetry install --no-cache
# However we need to install the package when the container starts
# Added: poetry install
ENTRYPOINT ["/bin/sh", "-c" , "poetry install && poetry run python src/worker/main.py"]
ENTRYPOINT ["/bin/sh", "-c" , "poetry install --only-root && poetry run python src/worker/main.py"]
33 changes: 32 additions & 1 deletion services/worker/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions services/worker/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ transformers = "^4.26.1"
trec-car-tools = { path = "vendors/trec-car-tools/python3" }
typer = "^0.4.2"
wget = "^3.2"
mirakuru = "^2.4.2"
severo marked this conversation as resolved.
Show resolved Hide resolved

[tool.poetry.group.dev.dependencies]
bandit = "^1.7.4"
Expand Down
7 changes: 7 additions & 0 deletions services/worker/src/worker/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
WORKER_MAX_LOAD_PCT = 70
WORKER_MAX_MEMORY_PCT = 80
WORKER_SLEEP_SECONDS = 15
WORKER_HEARTBEAT_TIME_INTERVAL_SECONDS = 60
severo marked this conversation as resolved.
Show resolved Hide resolved


def get_empty_str_list() -> List[str]:
Expand All @@ -34,6 +35,8 @@ class WorkerConfig:
only_job_types: list[str] = field(default_factory=get_empty_str_list)
sleep_seconds: int = WORKER_SLEEP_SECONDS
storage_paths: List[str] = field(default_factory=get_empty_str_list)
state_path: Optional[str] = None
heartbeat_time_interval_seconds: int = WORKER_HEARTBEAT_TIME_INTERVAL_SECONDS
severo marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def from_env(cls) -> "WorkerConfig":
Expand All @@ -47,6 +50,10 @@ def from_env(cls) -> "WorkerConfig":
sleep_seconds=env.int(name="SLEEP_SECONDS", default=WORKER_SLEEP_SECONDS),
only_job_types=env.list(name="ONLY_JOB_TYPES", default=get_empty_str_list()),
storage_paths=env.list(name="STORAGE_PATHS", default=get_empty_str_list()),
state_path=env.str(name="STATE_PATH", default=None),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could move the default value WORKER_STATE_PATH here "worker_state.json", almost all configs had their own default value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WORKER_STATE_PATH has no default - it changes for every sessions using a temporary directory.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's already what we do with the assets directory: https://github.com/huggingface/datasets-server/blob/main/libs/libcommon/src/libcommon/config.py#L14

Note: I prefer to always indirect via a constant, ie. WORKER_STATE_PATH: Optional[str] = None. Not sure if it's an exaggeration, though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another comment: I didn't get that STATE_PATH is the state file basename (we create a file by appending .lock at the end at https://github.com/huggingface/datasets-server/pull/824/files#diff-643260fba42f231dbf4e91102b52af6acb3c216a0eb6f538ac42bc667ebe5381R145). Maybe the name could be more descriptive, or maybe we could replace by STATE_FILENAME and just use it without appending .lock?

heartbeat_time_interval_seconds=env.int(
name="HEARTBEAT_TIME_INTERVAL_SECONDS", default=WORKER_HEARTBEAT_TIME_INTERVAL_SECONDS
),
severo marked this conversation as resolved.
Show resolved Hide resolved
)


Expand Down
21 changes: 19 additions & 2 deletions services/worker/src/worker/loop.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 The HuggingFace Authors.

import json
import logging
import random
import time
from dataclasses import dataclass, field
from typing import Optional, TypedDict

from libcommon.queue import EmptyQueueError, Queue
from filelock import FileLock
from libcommon.queue import EmptyQueueError, JobInfo, Queue
from psutil import cpu_count, disk_usage, getloadavg, swap_memory, virtual_memory

from worker.config import WorkerConfig
Expand All @@ -17,6 +20,10 @@ class UnknownJobTypeError(Exception):
pass


class WorkerState(TypedDict):
current_job_info: Optional[JobInfo]


@dataclass
class Loop:
"""
Expand Down Expand Up @@ -96,7 +103,7 @@ def sleep(self) -> None:
time.sleep(duration)

def run(self) -> None:
logging.info("Worker started")
logging.info("Worker loop started")
try:
while True:
if self.has_resources() and self.process_next_job():
Expand All @@ -119,13 +126,23 @@ def process_next_job(self) -> bool:
f" ${', '.join(self.worker_config.only_job_types)}). The queue should not have provided this"
" job. It is in an inconsistent state. Please report this issue to the datasets team."
)
self.set_worker_state(current_job_info=job_info)
logging.debug(f"job assigned: {job_info}")
except EmptyQueueError:
self.set_worker_state(current_job_info=None)
logging.debug("no job in the queue")
return False

job_runner = self.job_runner_factory.create_job_runner(job_info)
finished_status = job_runner.run()
self.queue.finish_job(job_id=job_runner.job_id, finished_status=finished_status)
self.set_worker_state(current_job_info=None)
logging.debug(f"job finished with {finished_status.value}: {job_runner}")
return True

def set_worker_state(self, current_job_info: Optional[JobInfo]) -> None:
worker_state: WorkerState = {"current_job_info": current_job_info}
if self.worker_config.state_path:
severo marked this conversation as resolved.
Show resolved Hide resolved
with FileLock(self.worker_config.state_path + ".lock"):
with open(self.worker_config.state_path, "w") as worker_state_f:
json.dump(worker_state, worker_state_f)
129 changes: 83 additions & 46 deletions services/worker/src/worker/main.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,90 @@
# SPDX-License-Identifier: Apache-2.0
# Copyright 2022 The HuggingFace Authors.
import json
severo marked this conversation as resolved.
Show resolved Hide resolved
import logging
import os
import sys
import tempfile
import time
from typing import Optional

from filelock import FileLock
from libcommon.log import init_logging
from libcommon.processing_graph import ProcessingGraph
from libcommon.resources import CacheMongoResource, QueueMongoResource
from libcommon.storage import init_assets_dir
from libcommon.queue import Job, Status, get_datetime
from libcommon.resources import QueueMongoResource
from mirakuru import OutputExecutor

from worker import start_worker_loop
from worker.config import AppConfig
from worker.job_runner_factory import JobRunnerFactory
from worker.loop import Loop
from worker.resources import LibrariesResource
from worker.loop import WorkerState

WORKER_STATE_FILE_NAME = "worker_state.json"
START_WORKER_LOOP_PATH = start_worker_loop.__file__


class WorkerExecutor:
severo marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, app_config: AppConfig) -> None:
self.app_config = app_config

def _create_worker_loop_executor(self) -> OutputExecutor:
banner = self.app_config.worker.state_path
if not banner:
raise ValueError("Failed to create the executor because WORKER_STATE_PATH is missing.")
start_worker_loop_command = [
sys.executable,
START_WORKER_LOOP_PATH,
"--print-worker-state-path",
]
return OutputExecutor(start_worker_loop_command, banner, timeout=10)
severo marked this conversation as resolved.
Show resolved Hide resolved

def start(self) -> None:
worker_loop_executor = self._create_worker_loop_executor()
worker_loop_executor.start() # blocking until the banner is printed
logging.info("Starting heartbeat.")
while worker_loop_executor.running():
self.heartbeat()
time.sleep(self.app_config.worker.heartbeat_time_interval_seconds)
worker_loop_executor.stop()
severo marked this conversation as resolved.
Show resolved Hide resolved

def get_state(self) -> WorkerState:
worker_state_path = self.app_config.worker.state_path
if not worker_state_path:
raise ValueError("Failed to get worker state because WORKER_STATE_PATH is missing.")
if os.path.exists(worker_state_path):
with FileLock(worker_state_path + ".lock"):
severo marked this conversation as resolved.
Show resolved Hide resolved
try:
with open(worker_state_path, "r") as worker_state_f:
worker_state = json.load(worker_state_f)
return WorkerState(current_job_info=worker_state.get("current_job_info"))
except json.JSONDecodeError:
severo marked this conversation as resolved.
Show resolved Hide resolved
return WorkerState(current_job_info=None)
else:
return WorkerState(current_job_info=None)

def get_current_job(self) -> Optional[Job]:
severo marked this conversation as resolved.
Show resolved Hide resolved
worker_state = self.get_state()
if worker_state["current_job_info"]:
job = Job.objects.with_id(worker_state["current_job_info"]["job_id"]) # type: ignore
if job and isinstance(job, Job) and job.status == Status.STARTED:
return job
return None

def heartbeat(self) -> None:
current_job = self.get_current_job()
if current_job:
current_job.update(last_heartbeat=get_datetime())


if __name__ == "__main__":
app_config = AppConfig.from_env()

init_logging(log_level=app_config.common.log_level)
# ^ set first to have logs as soon as possible
assets_directory = init_assets_dir(directory=app_config.assets.storage_directory)

processing_graph = ProcessingGraph(app_config.processing_graph.specification)

with (
LibrariesResource(
hf_endpoint=app_config.common.hf_endpoint,
init_hf_datasets_cache=app_config.datasets_based.hf_datasets_cache,
numba_path=app_config.numba.path,
) as libraries_resource,
CacheMongoResource(
database=app_config.cache.mongo_database, host=app_config.cache.mongo_url
) as cache_resource,
QueueMongoResource(
with tempfile.TemporaryDirectory() as tmp_dir:
severo marked this conversation as resolved.
Show resolved Hide resolved
if "WORKER_STATE_PATH" not in os.environ:
os.environ["WORKER_STATE_PATH"] = os.path.join(tmp_dir, WORKER_STATE_FILE_NAME)

app_config = AppConfig.from_env()
init_logging(log_level=app_config.common.log_level)

with QueueMongoResource(
database=app_config.queue.mongo_database, host=app_config.queue.mongo_url
) as queue_resource,
):
if not cache_resource.is_available():
raise RuntimeError("The connection to the cache database could not be established. Exiting.")
if not queue_resource.is_available():
raise RuntimeError("The connection to the queue database could not be established. Exiting.")

job_runner_factory = JobRunnerFactory(
app_config=app_config,
processing_graph=processing_graph,
hf_datasets_cache=libraries_resource.hf_datasets_cache,
assets_directory=assets_directory,
)
loop = Loop(
library_cache_paths=libraries_resource.storage_paths,
job_runner_factory=job_runner_factory,
max_jobs_per_namespace=app_config.queue.max_jobs_per_namespace,
worker_config=app_config.worker,
)
loop.run()
) as queue_resource:
if not queue_resource.is_available():
raise RuntimeError("The connection to the queue database could not be established. Exiting.")
worker_executor = WorkerExecutor(app_config)
worker_executor.start()
Loading