Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IBM Code Engine] Added local disk variable for scheduler and worker #440

Merged
merged 2 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dask_cloudprovider/cloudprovider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,11 @@ cloudprovider:
project_id: null
scheduler_cpu: "1.0"
scheduler_mem: 4G
scheduler_disk: 400M
scheduler_timeout: 600 # seconds
worker_cpu: "2.0"
worker_mem: 8G
worker_disk: 400M
worker_threads: 1

openstack:
Expand Down
25 changes: 25 additions & 0 deletions dask_cloudprovider/ibm/code_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@ def __init__(
project_id: str = None,
scheduler_cpu: str = None,
scheduler_mem: str = None,
scheduler_disk: str = None,
scheduler_timeout: int = None,
worker_cpu: str = None,
worker_mem: str = None,
worker_disk: str = None,
worker_threads: int = None,
api_key: str = None,
**kwargs,
Expand All @@ -56,9 +58,11 @@ def __init__(
self.project_id = project_id
self.scheduler_cpu = scheduler_cpu
self.scheduler_mem = scheduler_mem
self.scheduler_disk = scheduler_disk
self.scheduler_timeout = scheduler_timeout
self.worker_cpu = worker_cpu
self.worker_mem = worker_mem
self.worker_disk = worker_disk
self.worker_threads = worker_threads
self.api_key = api_key

Expand Down Expand Up @@ -89,6 +93,7 @@ async def create_vm(self):
scale_min_instances=1,
scale_concurrency=1000,
scale_memory_limit=self.memory,
scale_ephemeral_storage_limit=self.disk,
scale_request_timeout=self.cluster.scheduler_timeout,
run_env_variables=[
{
Expand Down Expand Up @@ -144,6 +149,7 @@ def create_job_run_thread():
run_commands=self.command,
scale_cpu_limit=self.cpu,
scale_memory_limit=self.memory,
scale_ephemeral_storage_limit=self.disk,
run_env_variables=[
{
"type": "config_map_key_reference",
Expand Down Expand Up @@ -194,6 +200,7 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.cpu = self.cluster.scheduler_cpu
self.memory = self.cluster.scheduler_mem
self.disk = self.cluster.scheduler_disk

self.command = [
"python",
Expand All @@ -211,9 +218,11 @@ async def start(self):
f"\n Project id: {self.project_id} "
f"\n Scheduler CPU: {self.cpu} "
f"\n Scheduler Memory: {self.memory} "
f"\n Scheduler Disk: {self.disk} "
f"\n Scheduler Timeout: {self.cluster.scheduler_timeout} "
f"\n Worker CPU: {self.cluster.worker_cpu} "
f"\n Worker Memory: {self.cluster.worker_mem} "
f"\n Worker Disk: {self.cluster.worker_disk} "
f"\n Worker Threads: {self.cluster.worker_threads} "
)
self.cluster._log(f"Creating scheduler instance {self.name}")
Expand Down Expand Up @@ -244,6 +253,7 @@ def __init__(
self.worker_options = worker_options
self.cpu = self.cluster.worker_cpu
self.memory = self.cluster.worker_mem
self.disk = self.cluster.worker_disk

# On this case, the worker must connect to the scheduler internal URL with the "ws" protocol and port 80
internal_scheduler = f"ws://{self.cluster.scheduler_internal_ip}:80"
Expand Down Expand Up @@ -300,6 +310,8 @@ class IBMCodeEngineCluster(VMCluster):
The amount of memory to allocate to the scheduler.

See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo
scheduler_disk: str
The amount of ephemeral storage to allocate to the scheduler. This value must be lower than scheduler_mem.
scheduler_timeout: int
The timeout for the scheduler in seconds.
worker_cpu: str
Expand All @@ -310,6 +322,8 @@ class IBMCodeEngineCluster(VMCluster):
The amount of memory to allocate to each worker.

See: https://cloud.ibm.com/docs/codeengine?topic=codeengine-mem-cpu-combo
worker_disk: str
The amount of ephemeral storage to allocate to each worker. This value must be lower than worker_mem.
worker_threads: int
The number of threads to use on each worker.
debug: bool, optional
Expand Down Expand Up @@ -347,9 +361,11 @@ class IBMCodeEngineCluster(VMCluster):
Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0
Scheduler CPU: 0.25
Scheduler Memory: 1G
Scheduler Disk: 400M
Scheduler Timeout: 600
Worker CPU: 2
Worker Memory: 4G
Worker Disk: 400M
Creating scheduler dask-xxxxxxxx-scheduler
Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443
Scheduler is running
Expand Down Expand Up @@ -382,9 +398,12 @@ class IBMCodeEngineCluster(VMCluster):
Project id: f21626f6-54f7-4065-a038-75c8b9a0d2e0
Scheduler CPU: 0.25
Scheduler Memory: 1G
Scheduler Disk: 400M
Scheduler Timeout: 600
Worker CPU: 2
Worker Memory: 4G
Worker Disk: 400M
Worker Threads: 1
Creating scheduler dask-xxxxxxxx-scheduler
Waiting for scheduler to run at dask-xxxxxxxx-scheduler.xxxxxxxxxxxx.xx-xx.codeengine.appdomain.cloud:443
Scheduler is running
Expand All @@ -402,9 +421,11 @@ def __init__(
project_id: str = None,
scheduler_cpu: str = None,
scheduler_mem: str = None,
scheduler_disk: str = None,
scheduler_timeout: int = None,
worker_cpu: str = None,
worker_mem: str = None,
worker_disk: str = None,
worker_threads: int = 1,
debug: bool = False,
**kwargs,
Expand All @@ -419,11 +440,13 @@ def __init__(
api_key = self.config.get("api_key")
self.scheduler_cpu = scheduler_cpu or self.config.get("scheduler_cpu")
self.scheduler_mem = scheduler_mem or self.config.get("scheduler_mem")
self.scheduler_disk = scheduler_disk or self.config.get("scheduler_disk")
self.scheduler_timeout = scheduler_timeout or self.config.get(
"scheduler_timeout"
)
self.worker_cpu = worker_cpu or self.config.get("worker_cpu")
self.worker_mem = worker_mem or self.config.get("worker_mem")
self.worker_disk = worker_disk or self.config.get("worker_disk")
self.worker_threads = worker_threads or self.config.get("worker_threads")

self.debug = debug
Expand All @@ -436,9 +459,11 @@ def __init__(
"project_id": self.project_id,
"scheduler_cpu": self.scheduler_cpu,
"scheduler_mem": self.scheduler_mem,
"scheduler_disk": self.scheduler_disk,
"scheduler_timeout": self.scheduler_timeout,
"worker_cpu": self.worker_cpu,
"worker_mem": self.worker_mem,
"worker_disk": self.worker_disk,
"worker_threads": self.worker_threads,
"api_key": api_key,
}
Expand Down
Loading