diff --git a/DEVELOPER_GUIDE.md b/DEVELOPER_GUIDE.md index d80c95b8fb..d446e6d012 100644 --- a/DEVELOPER_GUIDE.md +++ b/DEVELOPER_GUIDE.md @@ -155,7 +155,7 @@ GITHUB_TOKEN=xxx ## Mac OS -To install the [datasets based worker](./services/worker) on Mac OS, you can follow the next steps. +To install the [worker](./services/worker) on Mac OS, you can follow the next steps. ### First: as an administrator diff --git a/chart/env/dev.yaml b/chart/env/dev.yaml index 7fb0392adc..6f1a775def 100644 --- a/chart/env/dev.yaml +++ b/chart/env/dev.yaml @@ -239,7 +239,3 @@ sizes: limits: cpu: 1 memory: "4Gi" - -# --- datasets_based --- -datasetsBased: - contentMaxBytes: "10_000_000" diff --git a/chart/env/prod.yaml b/chart/env/prod.yaml index 13bdb59811..8b324fd262 100644 --- a/chart/env/prod.yaml +++ b/chart/env/prod.yaml @@ -309,7 +309,3 @@ sizes: limits: cpu: 2 memory: "1Gi" - -# --- datasets_based --- -datasetsBased: - contentMaxBytes: "10_000_000" diff --git a/chart/templates/_envDatasetsBased.tpl b/chart/templates/_envDatasetsBased.tpl index ee8fea751e..05d1ed2930 100644 --- a/chart/templates/_envDatasetsBased.tpl +++ b/chart/templates/_envDatasetsBased.tpl @@ -8,7 +8,5 @@ value: "/tmp/modules-cache" - name: NUMBA_CACHE_DIR value: "/tmp/numba-cache" -- name: CONTENT_MAX_BYTES - value: {{ .Values.datasetsBased.contentMaxBytes}} {{- end -}} diff --git a/chart/templates/_envWorker.tpl b/chart/templates/_envWorker.tpl new file mode 100644 index 0000000000..daa35eb481 --- /dev/null +++ b/chart/templates/_envWorker.tpl @@ -0,0 +1,19 @@ +# SPDX-License-Identifier: Apache-2.0 +# Copyright 2022 The HuggingFace Authors. + +{{- define "envWorker" -}} +- name: WORKER_CONTENT_MAX_BYTES + value: {{ .Values.worker.contentMaxBytes | quote}} + # WORKER_ENDPOINT is not defined here, it's hard-coded in the template +- name: WORKER_MAX_DISK_USAGE_PCT + value: {{ .Values.worker.maxDiskUsagePct | quote }} +- name: WORKER_MAX_LOAD_PCT + value: {{ .Values.worker.maxLoadPct | quote }} +- name: WORKER_MAX_MEMORY_PCT + value: {{ .Values.worker.maxMemoryPct | quote }} +- name: WORKER_SLEEP_SECONDS + value: {{ .Values.worker.sleepSeconds | quote }} +- name: WORKER_STORAGE_PATHS + value: {{ .Values.assets.storageDirectory | quote }} + # ^ note: for datasets_based workers, the datasets cache is automatically added, so no need to add it here +{{- end -}} diff --git a/chart/templates/_envWorkerLoop.tpl b/chart/templates/_envWorkerLoop.tpl deleted file mode 100644 index e4d922bcc5..0000000000 --- a/chart/templates/_envWorkerLoop.tpl +++ /dev/null @@ -1,13 +0,0 @@ -# SPDX-License-Identifier: Apache-2.0 -# Copyright 2022 The HuggingFace Authors. - -{{- define "envWorkerLoop" -}} -- name: WORKER_LOOP_MAX_DISK_USAGE_PCT - value: {{ .Values.workerLoop.maxDiskUsagePct | quote }} -- name: WORKER_LOOP_MAX_LOAD_PCT - value: {{ .Values.workerLoop.maxLoadPct | quote }} -- name: WORKER_LOOP_MAX_MEMORY_PCT - value: {{ .Values.workerLoop.maxMemoryPct | quote }} -- name: WORKER_LOOP_SLEEP_SECONDS - value: {{ .Values.workerLoop.sleepSeconds | quote }} -{{- end -}} diff --git a/chart/templates/_helpers.tpl b/chart/templates/_helpers.tpl index 84a19bf8d0..3ad176a468 100644 --- a/chart/templates/_helpers.tpl +++ b/chart/templates/_helpers.tpl @@ -82,7 +82,7 @@ imagePullSecrets: {{ include "datasetsServer.images.image" (dict "imageRoot" .Values.images.services.api "global" .Values.global.huggingface) }} {{- end -}} -{{- define "workers.datasetsBased.image" -}} +{{- define "services.worker.image" -}} {{ include "datasetsServer.images.image" (dict "imageRoot" .Values.images.services.worker "global" .Values.global.huggingface) }} {{- end -}} @@ -263,4 +263,4 @@ Return the HUB url {{- $hubName := ((list $.Release.Name "hub") | join "-") | trunc 63 | trimSuffix "-" -}} http://{{ $hubName }} {{- end -}} -{{- end -}} \ No newline at end of file +{{- end -}} diff --git a/chart/templates/worker/config-names/_container.tpl b/chart/templates/worker/config-names/_container.tpl index 3a87636b61..def7afb6a2 100644 --- a/chart/templates/worker/config-names/_container.tpl +++ b/chart/templates/worker/config-names/_container.tpl @@ -3,21 +3,19 @@ {{- define "containerWorkerConfigNames" -}} - name: "{{ include "name" . }}-worker-config-names" - image: {{ include "workers.datasetsBased.image" . }} + image: {{ include "services.worker.image" . }} imagePullPolicy: {{ .Values.images.pullPolicy }} env: - - name: DATASETS_BASED_ENDPOINT + - name: WORKER_ENDPOINT value: "/config-names" # ^ hard-coded {{ include "envCache" . | nindent 2 }} {{ include "envQueue" . | nindent 2 }} {{ include "envCommon" . | nindent 2 }} - {{ include "envWorkerLoop" . | nindent 2 }} + {{ include "envWorker" . | nindent 2 }} {{ include "envDatasetsBased" . | nindent 2 }} - name: DATASETS_BASED_HF_DATASETS_CACHE value: {{ printf "%s/config-names/datasets" .Values.cacheDirectory | quote }} - - name: DATASETS_BASED_CONTENT_MAX_BYTES - value: {{ .Values.datasetsBased.contentMaxBytes | quote}} - name: QUEUE_MAX_JOBS_PER_NAMESPACE # value: {{ .Values.queue.maxJobsPerNamespace | quote }} # overridden diff --git a/chart/templates/worker/dataset-info/_container.tpl b/chart/templates/worker/dataset-info/_container.tpl index 03bb31c028..3628349786 100644 --- a/chart/templates/worker/dataset-info/_container.tpl +++ b/chart/templates/worker/dataset-info/_container.tpl @@ -3,18 +3,16 @@ {{- define "containerWorkerDatasetInfo" -}} - name: "{{ include "name" . }}-worker-dataset-info" - image: {{ include "workers.datasetsBased.image" . }} + image: {{ include "services.worker.image" . }} imagePullPolicy: {{ .Values.images.pullPolicy }} env: - - name: DATASETS_BASED_ENDPOINT + - name: WORKER_ENDPOINT value: "/dataset-info" # ^ hard-coded {{ include "envCache" . | nindent 2 }} {{ include "envQueue" . | nindent 2 }} {{ include "envCommon" . | nindent 2 }} - {{ include "envWorkerLoop" . | nindent 2 }} - - name: DATASETS_BASED_CONTENT_MAX_BYTES - value: {{ .Values.datasetsBased.contentMaxBytes | quote}} + {{ include "envWorker" . | nindent 2 }} - name: QUEUE_MAX_JOBS_PER_NAMESPACE # value: {{ .Values.queue.maxJobsPerNamespace | quote }} # overridden diff --git a/chart/templates/worker/first-rows/_container.tpl b/chart/templates/worker/first-rows/_container.tpl index 83e2fd723a..884b136fd1 100644 --- a/chart/templates/worker/first-rows/_container.tpl +++ b/chart/templates/worker/first-rows/_container.tpl @@ -3,25 +3,20 @@ {{- define "containerWorkerFirstRows" -}} - name: "{{ include "name" . }}-worker-first-rows" - image: {{ include "workers.datasetsBased.image" . }} + image: {{ include "services.worker.image" . }} imagePullPolicy: {{ .Values.images.pullPolicy }} env: - - name: DATASETS_BASED_ENDPOINT + - name: WORKER_ENDPOINT value: "/first-rows" # ^ hard-coded {{ include "envAssets" . | nindent 2 }} {{ include "envCache" . | nindent 2 }} {{ include "envQueue" . | nindent 2 }} {{ include "envCommon" . | nindent 2 }} - {{ include "envWorkerLoop" . | nindent 2 }} - - name: WORKER_LOOP_STORAGE_PATHS - value: {{ .Values.assets.storageDirectory | quote }} - # ^ note: the datasets cache is automatically added, so no need to add it here + {{ include "envWorker" . | nindent 2 }} {{ include "envDatasetsBased" . | nindent 2 }} - name: DATASETS_BASED_HF_DATASETS_CACHE value: {{ printf "%s/first-rows/datasets" .Values.cacheDirectory | quote }} - - name: DATASETS_BASED_CONTENT_MAX_BYTES - value: {{ .Values.datasetsBased.contentMaxBytes | quote}} - name: QUEUE_MAX_JOBS_PER_NAMESPACE # value: {{ .Values.queue.maxJobsPerNamespace | quote }} # overridden diff --git a/chart/templates/worker/parquet-and-dataset-info/_container.tpl b/chart/templates/worker/parquet-and-dataset-info/_container.tpl index 2a0f4b858a..ef54ff92ef 100644 --- a/chart/templates/worker/parquet-and-dataset-info/_container.tpl +++ b/chart/templates/worker/parquet-and-dataset-info/_container.tpl @@ -3,21 +3,19 @@ {{- define "containerWorkerParquetAndDatasetInfo" -}} - name: "{{ include "name" . }}-worker-parquet-and-dataset-info" - image: {{ include "workers.datasetsBased.image" . }} + image: {{ include "services.worker.image" . }} imagePullPolicy: {{ .Values.images.pullPolicy }} env: - - name: DATASETS_BASED_ENDPOINT + - name: WORKER_ENDPOINT value: "/parquet-and-dataset-info" # ^ hard-coded {{ include "envCache" . | nindent 2 }} {{ include "envQueue" . | nindent 2 }} {{ include "envCommon" . | nindent 2 }} - {{ include "envWorkerLoop" . | nindent 2 }} + {{ include "envWorker" . | nindent 2 }} {{ include "envDatasetsBased" . | nindent 2 }} - name: DATASETS_BASED_HF_DATASETS_CACHE value: {{ printf "%s/parquet-and-dataset-info/datasets" .Values.cacheDirectory | quote }} - - name: DATASETS_BASED_CONTENT_MAX_BYTES - value: {{ .Values.datasetsBased.contentMaxBytes | quote}} - name: QUEUE_MAX_JOBS_PER_NAMESPACE # value: {{ .Values.queue.maxJobsPerNamespace | quote }} # overridden diff --git a/chart/templates/worker/parquet/_container.tpl b/chart/templates/worker/parquet/_container.tpl index ec04cbeac3..33613ab55d 100644 --- a/chart/templates/worker/parquet/_container.tpl +++ b/chart/templates/worker/parquet/_container.tpl @@ -3,18 +3,16 @@ {{- define "containerWorkerParquet" -}} - name: "{{ include "name" . }}-worker-parquet" - image: {{ include "workers.datasetsBased.image" . }} + image: {{ include "services.worker.image" . }} imagePullPolicy: {{ .Values.images.pullPolicy }} env: - - name: DATASETS_BASED_ENDPOINT + - name: WORKER_ENDPOINT value: "/parquet" # ^ hard-coded {{ include "envCache" . | nindent 2 }} {{ include "envQueue" . | nindent 2 }} {{ include "envCommon" . | nindent 2 }} - {{ include "envWorkerLoop" . | nindent 2 }} - - name: DATASETS_BASED_CONTENT_MAX_BYTES - value: {{ .Values.datasetsBased.contentMaxBytes | quote}} + {{ include "envWorker" . | nindent 2 }} - name: QUEUE_MAX_JOBS_PER_NAMESPACE # value: {{ .Values.queue.maxJobsPerNamespace | quote }} # overridden diff --git a/chart/templates/worker/sizes/_container.tpl b/chart/templates/worker/sizes/_container.tpl index a575818d45..e42ab78345 100644 --- a/chart/templates/worker/sizes/_container.tpl +++ b/chart/templates/worker/sizes/_container.tpl @@ -3,18 +3,16 @@ {{- define "containerWorkerSizes" -}} - name: "{{ include "name" . }}-worker-sizes" - image: {{ include "workers.datasetsBased.image" . }} + image: {{ include "services.worker.image" . }} imagePullPolicy: {{ .Values.images.pullPolicy }} env: - - name: DATASETS_BASED_ENDPOINT + - name: WORKER_ENDPOINT value: "/sizes" # ^ hard-coded {{ include "envCache" . | nindent 2 }} {{ include "envQueue" . | nindent 2 }} {{ include "envCommon" . | nindent 2 }} - {{ include "envWorkerLoop" . | nindent 2 }} - - name: DATASETS_BASED_CONTENT_MAX_BYTES - value: {{ .Values.datasetsBased.contentMaxBytes | quote}} + {{ include "envWorker" . | nindent 2 }} - name: QUEUE_MAX_JOBS_PER_NAMESPACE # value: {{ .Values.queue.maxJobsPerNamespace | quote }} # overridden diff --git a/chart/templates/worker/split-names/_container.tpl b/chart/templates/worker/split-names/_container.tpl index 53f0d88c02..694349fbf8 100644 --- a/chart/templates/worker/split-names/_container.tpl +++ b/chart/templates/worker/split-names/_container.tpl @@ -3,21 +3,19 @@ {{- define "containerWorkerSplitNames" -}} - name: "{{ include "name" . }}-worker-split-names" - image: {{ include "workers.datasetsBased.image" . }} + image: {{ include "services.worker.image" . }} imagePullPolicy: {{ .Values.images.pullPolicy }} env: - - name: DATASETS_BASED_ENDPOINT + - name: WORKER_ENDPOINT value: "/split-names" # ^ hard-coded {{ include "envCache" . | nindent 2 }} {{ include "envQueue" . | nindent 2 }} {{ include "envCommon" . | nindent 2 }} - {{ include "envWorkerLoop" . | nindent 2 }} + {{ include "envWorker" . | nindent 2 }} {{ include "envDatasetsBased" . | nindent 2 }} - name: DATASETS_BASED_HF_DATASETS_CACHE value: {{ printf "%s/split-names/datasets" .Values.cacheDirectory | quote }} - - name: DATASETS_BASED_CONTENT_MAX_BYTES - value: {{ .Values.datasetsBased.contentMaxBytes | quote}} - name: QUEUE_MAX_JOBS_PER_NAMESPACE # value: {{ .Values.queue.maxJobsPerNamespace | quote }} # overridden diff --git a/chart/templates/worker/splits/_container.tpl b/chart/templates/worker/splits/_container.tpl index d862a20a68..c5211441f1 100644 --- a/chart/templates/worker/splits/_container.tpl +++ b/chart/templates/worker/splits/_container.tpl @@ -3,21 +3,19 @@ {{- define "containerWorkerSplits" -}} - name: "{{ include "name" . }}-worker-splits" - image: {{ include "workers.datasetsBased.image" . }} + image: {{ include "services.worker.image" . }} imagePullPolicy: {{ .Values.images.pullPolicy }} env: - - name: DATASETS_BASED_ENDPOINT + - name: WORKER_ENDPOINT value: "/splits" # ^ hard-coded {{ include "envCache" . | nindent 2 }} {{ include "envQueue" . | nindent 2 }} {{ include "envCommon" . | nindent 2 }} - {{ include "envWorkerLoop" . | nindent 2 }} + {{ include "envWorker" . | nindent 2 }} {{ include "envDatasetsBased" . | nindent 2 }} - name: DATASETS_BASED_HF_DATASETS_CACHE value: {{ printf "%s/splits/datasets" .Values.cacheDirectory | quote }} - - name: DATASETS_BASED_CONTENT_MAX_BYTES - value: {{ .Values.datasetsBased.contentMaxBytes | quote}} - name: QUEUE_MAX_JOBS_PER_NAMESPACE # value: {{ .Values.queue.maxJobsPerNamespace | quote }} # overridden diff --git a/chart/values.yaml b/chart/values.yaml index 3b1aff698c..b451368c73 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -94,7 +94,9 @@ queue: # Name of the mongo db database used to store the jobs queue mongoDatabase: "datasets_server_queue" -workerLoop: +worker: + # maximum size in bytes of the response content computed by a worker + contentMaxBytes: "10_000_000" # maximum disk usage of every storage disk in the list (in percentage) to allow a job to start. Set to 0 to disable the test. maxDiskUsagePct: 90 # Max CPU load (%) - if reached, sleeps until it comes back under the limit. Set to 0 to disable the test. @@ -376,7 +378,3 @@ sizes: limits: cpu: 0 tolerations: [] - -# --- datasets_based --- -datasetsBased: - contentMaxBytes: "10_000_000" diff --git a/services/worker/README.md b/services/worker/README.md index 04211f8c04..35df2da6a5 100644 --- a/services/worker/README.md +++ b/services/worker/README.md @@ -6,19 +6,29 @@ Use environment variables to configure the worker. The prefix of each environment variable gives its scope. +## Worker configuration + +Set environment variables to configure the worker. + +- `WORKER_CONTENT_MAX_BYTES`: the maximum size in bytes of the response content computed by a worker (to prevent returning big responses in the REST API). Defaults to `10_000_000`. +- `WORKER_ENDPOINT`: the endpoint on which the worker will work (pre-compute and cache the response). The same worker is used for different endpoints to reuse shared code and dependencies. But at runtime, the worker is assigned only one endpoint. Allowed values: `/splits`, `/first_rows`, `/parquet-and-dataset-info`, etc. Defaults to `/splits`. +- `WORKER_MAX_DISK_USAGE_PCT`: maximum disk usage of every storage disk in the list (in percentage) to allow a job to start. Set to 0 to disable the test. Defaults to 90. +- `WORKER_MAX_LOAD_PCT`: maximum load of the machine (in percentage: the max between the 1m load and the 5m load divided by the number of CPUs \*100) allowed to start a job. Set to 0 to disable the test. Defaults to 70. +- `WORKER_MAX_MEMORY_PCT`: maximum memory (RAM + SWAP) usage of the machine (in percentage) allowed to start a job. Set to 0 to disable the test. Defaults to 80. +- `WORKER_SLEEP_SECONDS`: wait duration in seconds at each loop iteration before checking if resources are available and processing a job if any is available. Note that the loop doesn't wait just after finishing a job: the next job is immediately processed. Defaults to `15`. +- `WORKER_STORAGE_PATHS`: comma-separated list of paths to check for disk usage. Defaults to empty. + ### Datasets based worker Set environment variables to configure the datasets-based worker (`DATASETS_BASED_` prefix): -- `DATASETS_BASED_ENDPOINT`: the endpoint on which the worker will work (pre-compute and cache the response). The same worker is used for different endpoints to reuse shared code and dependencies. But at runtime, the worker is assigned only one endpoint. Allowed values: `/splits`, `/first_rows`, and ` /parquet-and-dataset-info`. Defaults to `/splits`. - `DATASETS_BASED_HF_DATASETS_CACHE`: directory where the `datasets` library will store the cached datasets' data. If not set, the datasets library will choose the default location. Defaults to None. -- `DATASETS_BASED_CONTENT_MAX_BYTES`: the maximum size in bytes of the response content computed by a worker (to prevent returning big responses in the REST API). Defaults to `10_000_000`. Also, set the modules cache configuration for the datasets-based worker. See [../../libs/libcommon/README.md](../../libs/libcommon/README.md). Note that this variable has no `DATASETS_BASED_` prefix: - `HF_MODULES_CACHE`: directory where the `datasets` library will store the cached dataset scripts. If not set, the datasets library will choose the default location. Defaults to None. -Note that both directories will be appended to `WORKER_LOOP_STORAGE_PATHS` (see [../../libs/libcommon/README.md](../../libs/libcommon/README.md)) to hold the workers when the disk is full. +Note that both directories will be appended to `WORKER_STORAGE_PATHS` (see [../../libs/libcommon/README.md](../../libs/libcommon/README.md)) to hold the workers when the disk is full. ### Numba library @@ -26,7 +36,7 @@ Numba requires setting the `NUMBA_CACHE_DIR` environment variable to a writable - `NUMBA_CACHE_DIR`: directory where the `numba` decorators (used by `librosa`) can write cache. -Note that this directory will be appended to `WORKER_LOOP_STORAGE_PATHS` (see [../../libs/libcommon/README.md](../../libs/libcommon/README.md)) to hold the workers when the disk is full. +Note that this directory will be appended to `WORKER_STORAGE_PATHS` (see [../../libs/libcommon/README.md](../../libs/libcommon/README.md)) to hold the workers when the disk is full. ### Huggingface_hub library @@ -36,7 +46,7 @@ If the Hub is not https://huggingface.co (i.e., if you set the `COMMON_HF_ENDPOI ### First rows worker -Only needed when the `DATASETS_BASED_ENDPOINT` is set to `/first-rows`. +Only needed when the `WORKER_ENDPOINT` is set to `/first-rows`. Set environment variables to configure the first rows worker (`FIRST_ROWS_` prefix): @@ -50,7 +60,7 @@ Also, set the assets-related configuration for the first-rows worker. See [../.. ### Parquet and dataset info worker -Only needed when the `DATASETS_BASED_ENDPOINT` is set to `/parquet-and-dataset-info`. +Only needed when the `WORKER_ENDPOINT` is set to `/parquet-and-dataset-info`. Set environment variables to configure the parquet worker (`PARQUET_AND_DATASET_INFO_` prefix): @@ -70,13 +80,3 @@ The splits worker does not need any additional configuration. ### Common See [../../libs/libcommon/README.md](../../libs/libcommon/README.md) for more information about the common configuration. - -## Worker loop configuration - -Set environment variables to configure the worker loop that processes the queue. - -- `WORKER_LOOP_MAX_DISK_USAGE_PCT`: maximum disk usage of every storage disk in the list (in percentage) to allow a job to start. Set to 0 to disable the test. Defaults to 90. -- `WORKER_LOOP_MAX_LOAD_PCT`: maximum load of the machine (in percentage: the max between the 1m load and the 5m load divided by the number of CPUs \*100) allowed to start a job. Set to 0 to disable the test. Defaults to 70. -- `WORKER_LOOP_MAX_MEMORY_PCT`: maximum memory (RAM + SWAP) usage of the machine (in percentage) allowed to start a job. Set to 0 to disable the test. Defaults to 80. -- `WORKER_LOOP_SLEEP_SECONDS`: wait duration in seconds at each loop iteration before checking if resources are available and processing a job if any is available. Note that the loop doesn't wait just after finishing a job: the next job is immediately processed. Defaults to `15`. -- `WORKER_LOOP_STORAGE_PATHS`: comma-separated list of paths to check for disk usage. Defaults to empty. diff --git a/services/worker/src/worker/config.py b/services/worker/src/worker/config.py index 1cacd1c32a..545f790146 100644 --- a/services/worker/src/worker/config.py +++ b/services/worker/src/worker/config.py @@ -13,10 +13,12 @@ QueueConfig, ) -WORKER_LOOP_MAX_DISK_USAGE_PCT = 90 -WORKER_LOOP_MAX_LOAD_PCT = 70 -WORKER_LOOP_MAX_MEMORY_PCT = 80 -WORKER_LOOP_SLEEP_SECONDS = 15 +WORKER_CONTENT_MAX_BYTES = 10_000_000 +WORKER_ENDPOINT = "/config-names" +WORKER_MAX_DISK_USAGE_PCT = 90 +WORKER_MAX_LOAD_PCT = 70 +WORKER_MAX_MEMORY_PCT = 80 +WORKER_SLEEP_SECONDS = 15 def get_empty_str_list() -> List[str]: @@ -24,45 +26,43 @@ def get_empty_str_list() -> List[str]: @dataclass(frozen=True) -class LoopConfig: - max_disk_usage_pct: int = WORKER_LOOP_MAX_DISK_USAGE_PCT - max_load_pct: int = WORKER_LOOP_MAX_LOAD_PCT - max_memory_pct: int = WORKER_LOOP_MAX_MEMORY_PCT - sleep_seconds: int = WORKER_LOOP_SLEEP_SECONDS +class WorkerConfig: + content_max_bytes: int = WORKER_CONTENT_MAX_BYTES + endpoint: str = WORKER_ENDPOINT + max_disk_usage_pct: int = WORKER_MAX_DISK_USAGE_PCT + max_load_pct: int = WORKER_MAX_LOAD_PCT + max_memory_pct: int = WORKER_MAX_MEMORY_PCT + sleep_seconds: int = WORKER_SLEEP_SECONDS storage_paths: List[str] = field(default_factory=get_empty_str_list) @classmethod - def from_env(cls) -> "LoopConfig": + def from_env(cls) -> "WorkerConfig": env = Env(expand_vars=True) - with env.prefixed("WORKER_LOOP_"): + with env.prefixed("WORKER_"): return cls( - max_disk_usage_pct=env.int(name="MAX_DISK_USAGE_PCT", default=WORKER_LOOP_MAX_DISK_USAGE_PCT), - max_load_pct=env.int(name="MAX_LOAD_PCT", default=WORKER_LOOP_MAX_LOAD_PCT), - max_memory_pct=env.int(name="MAX_MEMORY_PCT", default=WORKER_LOOP_MAX_MEMORY_PCT), - sleep_seconds=env.int(name="SLEEP_SECONDS", default=WORKER_LOOP_SLEEP_SECONDS), + content_max_bytes=env.int(name="CONTENT_MAX_BYTES", default=WORKER_CONTENT_MAX_BYTES), + endpoint=env.str(name="ENDPOINT", default=WORKER_ENDPOINT), + max_disk_usage_pct=env.int(name="MAX_DISK_USAGE_PCT", default=WORKER_MAX_DISK_USAGE_PCT), + max_load_pct=env.int(name="MAX_LOAD_PCT", default=WORKER_MAX_LOAD_PCT), + max_memory_pct=env.int(name="MAX_MEMORY_PCT", default=WORKER_MAX_MEMORY_PCT), + sleep_seconds=env.int(name="SLEEP_SECONDS", default=WORKER_SLEEP_SECONDS), storage_paths=env.list(name="STORAGE_PATHS", default=get_empty_str_list()), ) -DATASETS_BASED_ENDPOINT = "/config-names" DATASETS_BASED_HF_DATASETS_CACHE = None -DATASETS_BASED_CONTENT_MAX_BYTES = 10_000_000 @dataclass(frozen=True) class DatasetsBasedConfig: - endpoint: str = DATASETS_BASED_ENDPOINT hf_datasets_cache: Optional[str] = DATASETS_BASED_HF_DATASETS_CACHE - content_max_bytes: int = DATASETS_BASED_CONTENT_MAX_BYTES @classmethod def from_env(cls) -> "DatasetsBasedConfig": env = Env(expand_vars=True) with env.prefixed("DATASETS_BASED_"): return cls( - endpoint=env.str(name="ENDPOINT", default=DATASETS_BASED_ENDPOINT), hf_datasets_cache=env.str(name="HF_DATASETS_CACHE", default=DATASETS_BASED_HF_DATASETS_CACHE), - content_max_bytes=env.int(name="CONTENT_MAX_BYTES", default=DATASETS_BASED_CONTENT_MAX_BYTES), ) @@ -154,7 +154,7 @@ class AppConfig: numba: NumbaConfig = field(default_factory=NumbaConfig) processing_graph: ProcessingGraphConfig = field(default_factory=ProcessingGraphConfig) queue: QueueConfig = field(default_factory=QueueConfig) - loop: LoopConfig = field(default_factory=LoopConfig) + worker: WorkerConfig = field(default_factory=WorkerConfig) @classmethod def from_env(cls) -> "AppConfig": @@ -166,5 +166,5 @@ def from_env(cls) -> "AppConfig": numba=NumbaConfig.from_env(), processing_graph=ProcessingGraphConfig.from_env(), queue=QueueConfig.from_env(), - loop=LoopConfig.from_env(), + worker=WorkerConfig.from_env(), ) diff --git a/services/worker/src/worker/job_runner.py b/services/worker/src/worker/job_runner.py index dd00b5a7d3..942e7dbe6f 100644 --- a/services/worker/src/worker/job_runner.py +++ b/services/worker/src/worker/job_runner.py @@ -23,7 +23,7 @@ from libcommon.utils import orjson_dumps from packaging import version -from worker.config import DatasetsBasedConfig +from worker.config import WorkerConfig GeneralJobRunnerErrorCode = Literal[ "ConfigNotFoundError", @@ -157,7 +157,7 @@ class JobRunner(ABC): split: Optional[str] = None force: bool priority: Priority - datasets_based_config: DatasetsBasedConfig + worker_config: WorkerConfig common_config: CommonConfig processing_step: ProcessingStep @@ -175,7 +175,7 @@ def __init__( self, job_info: JobInfo, common_config: CommonConfig, - datasets_based_config: DatasetsBasedConfig, + worker_config: WorkerConfig, processing_step: ProcessingStep, ) -> None: self.job_type = job_info["type"] @@ -186,7 +186,7 @@ def __init__( self.force = job_info["force"] self.priority = job_info["priority"] self.common_config = common_config - self.datasets_based_config = datasets_based_config + self.worker_config = worker_config self.processing_step = processing_step self.setup() @@ -323,10 +323,10 @@ def process( content = self.compute() # Validate content size - if len(orjson_dumps(content)) > self.datasets_based_config.content_max_bytes: + if len(orjson_dumps(content)) > self.worker_config.content_max_bytes: raise TooBigContentError( "The computed response content exceeds the supported size in bytes" - f" ({self.datasets_based_config.content_max_bytes})." + f" ({self.worker_config.content_max_bytes})." ) finally: # ensure the post_compute hook is called even if the compute raises an exception diff --git a/services/worker/src/worker/job_runner_factory.py b/services/worker/src/worker/job_runner_factory.py index 89cf577ce9..d793ccfa3c 100644 --- a/services/worker/src/worker/job_runner_factory.py +++ b/services/worker/src/worker/job_runner_factory.py @@ -96,21 +96,21 @@ def _create_job_runner(self, job_info: JobInfo) -> JobRunner: return ParquetJobRunner( job_info=job_info, common_config=self.app_config.common, - datasets_based_config=self.app_config.datasets_based, + worker_config=self.app_config.worker, processing_step=processing_step, ) if job_type == DatasetInfoJobRunner.get_job_type(): return DatasetInfoJobRunner( job_info=job_info, common_config=self.app_config.common, - datasets_based_config=self.app_config.datasets_based, + worker_config=self.app_config.worker, processing_step=processing_step, ) if job_type == SizesJobRunner.get_job_type(): return SizesJobRunner( job_info=job_info, common_config=self.app_config.common, - datasets_based_config=self.app_config.datasets_based, + worker_config=self.app_config.worker, processing_step=processing_step, ) supported_job_types = [ diff --git a/services/worker/src/worker/job_runners/_datasets_based_job_runner.py b/services/worker/src/worker/job_runners/_datasets_based_job_runner.py index 82119cbb41..70751adb27 100644 --- a/services/worker/src/worker/job_runners/_datasets_based_job_runner.py +++ b/services/worker/src/worker/job_runners/_datasets_based_job_runner.py @@ -34,7 +34,7 @@ def __init__( super().__init__( job_info=job_info, common_config=app_config.common, - datasets_based_config=app_config.datasets_based, + worker_config=app_config.worker, processing_step=processing_step, ) self.datasets_based_config = app_config.datasets_based diff --git a/services/worker/src/worker/loop.py b/services/worker/src/worker/loop.py index 7860ab3606..32b11a56f1 100644 --- a/services/worker/src/worker/loop.py +++ b/services/worker/src/worker/loop.py @@ -9,7 +9,7 @@ from libcommon.queue import EmptyQueueError, Queue from psutil import cpu_count, disk_usage, getloadavg, swap_memory, virtual_memory -from worker.config import LoopConfig +from worker.config import WorkerConfig from worker.job_runner_factory import BaseJobRunnerFactory @@ -29,19 +29,19 @@ class Loop: job_runner_factory (`JobRunnerFactory`): The job runner factory that will create a job runner for each job. Must be able to process the jobs of the queue. - loop_config (`LoopConfig`): - Loop configuration. + worker_config (`WorkerConfig`): + Worker configuration. """ library_cache_paths: set[str] queue: Queue job_runner_factory: BaseJobRunnerFactory - loop_config: LoopConfig + worker_config: WorkerConfig storage_paths: set[str] = field(init=False) def __post_init__(self) -> None: - self.storage_paths = set(self.loop_config.storage_paths).union(self.library_cache_paths) + self.storage_paths = set(self.worker_config.storage_paths).union(self.library_cache_paths) def log(self, level: int, msg: str) -> None: logging.log(level=level, msg=f"[{self.queue.type}] {msg}") @@ -59,35 +59,35 @@ def exception(self, msg: str) -> None: self.log(level=logging.ERROR, msg=msg) def has_memory(self) -> bool: - if self.loop_config.max_memory_pct <= 0: + if self.worker_config.max_memory_pct <= 0: return True virtual_memory_used: int = virtual_memory().used # type: ignore virtual_memory_total: int = virtual_memory().total # type: ignore percent = (swap_memory().used + virtual_memory_used) / (swap_memory().total + virtual_memory_total) - ok = percent < self.loop_config.max_memory_pct + ok = percent < self.worker_config.max_memory_pct if not ok: self.info( - f"memory usage (RAM + SWAP) is too high: {percent:.0f}% - max is {self.loop_config.max_memory_pct}%" + f"memory usage (RAM + SWAP) is too high: {percent:.0f}% - max is {self.worker_config.max_memory_pct}%" ) return ok def has_cpu(self) -> bool: - if self.loop_config.max_load_pct <= 0: + if self.worker_config.max_load_pct <= 0: return True load_pct = max(getloadavg()[:2]) / cpu_count() * 100 # ^ only current load and 5m load. 15m load is not relevant to decide to launch a new job - ok = load_pct < self.loop_config.max_load_pct + ok = load_pct < self.worker_config.max_load_pct if not ok: - self.info(f"cpu load is too high: {load_pct:.0f}% - max is {self.loop_config.max_load_pct}%") + self.info(f"cpu load is too high: {load_pct:.0f}% - max is {self.worker_config.max_load_pct}%") return ok def has_storage(self) -> bool: - if self.loop_config.max_disk_usage_pct <= 0: + if self.worker_config.max_disk_usage_pct <= 0: return True for path in self.storage_paths: try: usage = disk_usage(path) - if usage.percent >= self.loop_config.max_disk_usage_pct: + if usage.percent >= self.worker_config.max_disk_usage_pct: return False except Exception: # if we can't get the disk usage, we let the process continue @@ -100,7 +100,7 @@ def has_resources(self) -> bool: def sleep(self) -> None: jitter = 0.75 + random.random() / 2 # nosec # ^ between 0.75 and 1.25 - duration = self.loop_config.sleep_seconds * jitter + duration = self.worker_config.sleep_seconds * jitter self.debug(f"sleep during {duration:.2f} seconds") time.sleep(duration) diff --git a/services/worker/src/worker/main.py b/services/worker/src/worker/main.py index 6de77fae53..b1ae25d697 100644 --- a/services/worker/src/worker/main.py +++ b/services/worker/src/worker/main.py @@ -20,7 +20,7 @@ assets_directory = init_assets_dir(directory=app_config.assets.storage_directory) processing_graph = ProcessingGraph(app_config.processing_graph.specification) - processing_step = processing_graph.get_step(app_config.datasets_based.endpoint) + processing_step = processing_graph.get_step(app_config.worker.endpoint) with ( LibrariesResource( @@ -51,6 +51,6 @@ queue=queue, library_cache_paths=libraries_resource.storage_paths, job_runner_factory=job_runner_factory, - loop_config=app_config.loop, + worker_config=app_config.worker, ) loop.run() diff --git a/services/worker/tests/conftest.py b/services/worker/tests/conftest.py index 54048d0b38..e8686ecac5 100644 --- a/services/worker/tests/conftest.py +++ b/services/worker/tests/conftest.py @@ -53,7 +53,7 @@ def set_env_vars(datasets_cache_directory: Path, modules_cache_directory: Path) mp.setenv("PARQUET_AND_DATASET_INFO_COMMITTER_HF_TOKEN", CI_USER_TOKEN) mp.setenv("DATASETS_BASED_HF_DATASETS_CACHE", str(datasets_cache_directory)) mp.setenv("HF_MODULES_CACHE", str(modules_cache_directory)) - mp.setenv("DATASETS_BASED_CONTENT_MAX_BYTES", "10_000_000") + mp.setenv("WORKER_CONTENT_MAX_BYTES", "10_000_000") yield mp mp.undo() diff --git a/services/worker/tests/job_runners/test__datasets_based_worker.py b/services/worker/tests/job_runners/test__datasets_based_worker.py index 275e48bcc3..108c67d2ca 100644 --- a/services/worker/tests/job_runners/test__datasets_based_worker.py +++ b/services/worker/tests/job_runners/test__datasets_based_worker.py @@ -171,7 +171,7 @@ def test_process_big_content(hub_datasets: HubDatasets, app_config: AppConfig, g dataset, config, split, - app_config=replace(app_config, datasets_based=replace(app_config.datasets_based, content_max_bytes=10)), + app_config=replace(app_config, worker=replace(app_config.worker, content_max_bytes=10)), ) assert worker.process() is False diff --git a/services/worker/tests/job_runners/test_dataset_info.py b/services/worker/tests/job_runners/test_dataset_info.py index a0dfd6921e..a5010afb1b 100644 --- a/services/worker/tests/job_runners/test_dataset_info.py +++ b/services/worker/tests/job_runners/test_dataset_info.py @@ -46,7 +46,7 @@ def _get_job_runner( "priority": Priority.NORMAL, }, common_config=app_config.common, - datasets_based_config=app_config.datasets_based, + worker_config=app_config.worker, processing_step=ProcessingStep( endpoint=DatasetInfoJobRunner.get_job_type(), input_type="dataset", diff --git a/services/worker/tests/job_runners/test_parquet.py b/services/worker/tests/job_runners/test_parquet.py index 16b7a25cbe..ac4e4a070a 100644 --- a/services/worker/tests/job_runners/test_parquet.py +++ b/services/worker/tests/job_runners/test_parquet.py @@ -46,7 +46,7 @@ def _get_job_runner( "priority": Priority.NORMAL, }, common_config=app_config.common, - datasets_based_config=app_config.datasets_based, + worker_config=app_config.worker, processing_step=ProcessingStep( endpoint=ParquetJobRunner.get_job_type(), input_type="dataset", diff --git a/services/worker/tests/job_runners/test_sizes.py b/services/worker/tests/job_runners/test_sizes.py index 83326b9709..858fe019ca 100644 --- a/services/worker/tests/job_runners/test_sizes.py +++ b/services/worker/tests/job_runners/test_sizes.py @@ -46,7 +46,7 @@ def _get_job_runner( "priority": Priority.NORMAL, }, common_config=app_config.common, - datasets_based_config=app_config.datasets_based, + worker_config=app_config.worker, processing_step=ProcessingStep( endpoint=SizesJobRunner.get_job_type(), input_type="dataset", diff --git a/services/worker/tests/test_job_runner.py b/services/worker/tests/test_job_runner.py index fa0ca682b8..686acc723a 100644 --- a/services/worker/tests/test_job_runner.py +++ b/services/worker/tests/test_job_runner.py @@ -9,7 +9,7 @@ from libcommon.resources import CacheMongoResource, QueueMongoResource from libcommon.simple_cache import SplitFullName, upsert_response -from worker.config import DatasetsBasedConfig +from worker.config import WorkerConfig from worker.job_runner import ERROR_CODES_TO_RETRY, JobRunner @@ -78,7 +78,7 @@ def test_compare_major_version( }, processing_step=test_processing_step, common_config=CommonConfig(), - datasets_based_config=DatasetsBasedConfig(), + worker_config=WorkerConfig(), ) if should_raise: with pytest.raises(Exception): @@ -196,7 +196,7 @@ def test_should_skip_job( }, processing_step=test_processing_step, common_config=CommonConfig(), - datasets_based_config=DatasetsBasedConfig(), + worker_config=WorkerConfig(), ) if cache_entry: upsert_response( @@ -237,7 +237,7 @@ def test_check_type( }, processing_step=test_processing_step, common_config=CommonConfig(), - datasets_based_config=DatasetsBasedConfig(), + worker_config=WorkerConfig(), ) another_processing_step = ProcessingStep( @@ -262,7 +262,7 @@ def test_check_type( }, processing_step=another_processing_step, common_config=CommonConfig(), - datasets_based_config=DatasetsBasedConfig(), + worker_config=WorkerConfig(), ) @@ -288,7 +288,7 @@ def test_create_children_jobs() -> None: }, processing_step=root_step, common_config=CommonConfig(), - datasets_based_config=DatasetsBasedConfig(), + worker_config=WorkerConfig(), ) assert job_runner.should_skip_job() is False # we add an entry to the cache diff --git a/services/worker/tests/test_loop.py b/services/worker/tests/test_loop.py index 3901254021..59058ee40f 100644 --- a/services/worker/tests/test_loop.py +++ b/services/worker/tests/test_loop.py @@ -5,7 +5,7 @@ from libcommon.queue import Queue from libcommon.resources import CacheMongoResource, QueueMongoResource -from worker.config import AppConfig, DatasetsBasedConfig, LoopConfig +from worker.config import AppConfig, WorkerConfig from worker.job_runner import JobInfo, JobRunner from worker.job_runner_factory import BaseJobRunnerFactory from worker.loop import Loop @@ -32,14 +32,14 @@ def compute(self) -> Mapping[str, Any]: class DummyJobRunnerFactory(BaseJobRunnerFactory): def __init__(self, processing_step: ProcessingStep) -> None: self.common_config = CommonConfig() - self.datasets_based_config = DatasetsBasedConfig() + self.worker_config = WorkerConfig() self.processing_step = processing_step def _create_job_runner(self, job_info: JobInfo) -> JobRunner: return DummyJobRunner( job_info=job_info, common_config=self.common_config, - datasets_based_config=self.datasets_based_config, + worker_config=self.worker_config, processing_step=self.processing_step, ) @@ -57,7 +57,7 @@ def test_process_next_job( library_cache_paths=libraries_resource.storage_paths, queue=queue, job_runner_factory=factory, - loop_config=LoopConfig(), + worker_config=WorkerConfig(), ) assert loop.process_next_job() is False dataset = "dataset" diff --git a/tools/docker-compose-base.yml b/tools/docker-compose-base.yml index cb87973bdb..7da735f215 100644 --- a/tools/docker-compose-base.yml +++ b/tools/docker-compose-base.yml @@ -16,16 +16,16 @@ services: QUEUE_MONGO_URL: ${QUEUE_MONGO_URL-mongodb://mongodb} # use mongo container by default QUEUE_MONGO_DATABASE: ${QUEUE_MONGO_DATABASE-datasets_server_queue} # worker - WORKER_LOOP_MAX_DISK_USAGE_PCT: ${WORKER_LOOP_MAX_DISK_USAGE_PCT-90} - WORKER_LOOP_MAX_LOAD_PCT: ${WORKER_LOOP_MAX_LOAD_PCT-70} - WORKER_LOOP_MAX_MEMORY_PCT: ${WORKER_LOOP_MAX_MEMORY_PCT-80} - WORKER_LOOP_SLEEP_SECONDS: ${WORKER_LOOP_SLEEP_SECONDS-15} + WORKER_CONTENT_MAX_BYTES: ${WORKER_CONTENT_MAX_BYTES-10_000_000} + WORKER_MAX_DISK_USAGE_PCT: ${WORKER_MAX_DISK_USAGE_PCT-90} + WORKER_MAX_LOAD_PCT: ${WORKER_MAX_LOAD_PCT-70} + WORKER_MAX_MEMORY_PCT: ${WORKER_MAX_MEMORY_PCT-80} + WORKER_SLEEP_SECONDS: ${WORKER_SLEEP_SECONDS-15} datasets-worker: extends: service: common environment: # datasets DATASETS_BASED_HF_DATASETS_CACHE: ${HF_DATASETS_CACHE-/datasets-cache} - DATASETS_BASED_CONTENT_MAX_BYTES: ${DATASETS_BASED_CONTENT_MAX_BYTES-10_000_000} HF_MODULES_CACHE: ${HF_DATASETS_CACHE-/modules-cache} NUMBA_CACHE_DIR: ${NUMBA_CACHE_DIR-/numba-cache} diff --git a/tools/docker-compose-datasets-server.yml b/tools/docker-compose-datasets-server.yml index 676cc299a5..f3a81dcc28 100644 --- a/tools/docker-compose-datasets-server.yml +++ b/tools/docker-compose-datasets-server.yml @@ -74,7 +74,6 @@ services: build: context: .. dockerfile: services/worker/Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - splits-datasets-cache:${HF_DATASETS_CACHE-/datasets-cache}:rw @@ -84,7 +83,7 @@ services: file: docker-compose-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/config-names" # hard-coded + WORKER_ENDPOINT: "/config-names" # hard-coded depends_on: - mongodb restart: always @@ -92,7 +91,6 @@ services: build: context: .. dockerfile: services/worker/Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - splits-datasets-cache:${HF_DATASETS_CACHE-/datasets-cache}:rw @@ -102,7 +100,7 @@ services: file: docker-compose-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/split-names" # hard-coded + WORKER_ENDPOINT: "/split-names" # hard-coded depends_on: - mongodb restart: always @@ -110,7 +108,6 @@ services: build: context: .. dockerfile: services/worker/Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - splits-datasets-cache:${HF_DATASETS_CACHE-/datasets-cache}:rw @@ -120,7 +117,7 @@ services: file: docker-compose-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/splits" # hard-coded + WORKER_ENDPOINT: "/splits" # hard-coded depends_on: - mongodb restart: always @@ -128,7 +125,6 @@ services: build: context: .. dockerfile: services/worker/Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - first-rows-datasets-cache:${HF_DATASETS_CACHE-/datasets-cache}:rw @@ -140,13 +136,13 @@ services: environment: ASSETS_BASE_URL: "http://localhost:${PORT_REVERSE_PROXY-8000}/assets" # hard-coded to work with the reverse-proxy ASSETS_STORAGE_DIRECTORY: ${ASSETS_STORAGE_DIRECTORY-/assets} - DATASETS_BASED_ENDPOINT: "/first-rows" # hard-coded + WORKER_ENDPOINT: "/first-rows" # hard-coded FIRST_ROWS_MAX_BYTES: ${FIRST_ROWS_MAX_BYTES-1_000_000} FIRST_ROWS_MAX_NUMBER: ${FIRST_ROWS_MAX_NUMBER-100} FIRST_ROWS_MIN_CELL_BYTES: ${FIRST_ROWS_MIN_CELL_BYTES-100} FIRST_ROWS_MIN_NUMBER: ${FIRST_ROWS_MIN_NUMBER-10} FIRST_ROWS_COLUMNS_MAX_NUMBER: ${FIRST_ROWS_COLUMNS_MAX_NUMBER-1_000} - WORKER_LOOP_STORAGE_PATHS: ${ASSETS_STORAGE_DIRECTORY-/assets} + WORKER_STORAGE_PATHS: ${ASSETS_STORAGE_DIRECTORY-/assets} # ^ note: the datasets cache is automatically added, so no need to add it here depends_on: - mongodb @@ -155,7 +151,6 @@ services: build: context: .. dockerfile: services/worker/Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - parquet-datasets-cache:${HF_DATASETS_CACHE-/datasets-cache}:rw @@ -165,7 +160,7 @@ services: file: docker-compose-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/parquet-and-dataset-info" # hard-coded + WORKER_ENDPOINT: "/parquet-and-dataset-info" # hard-coded PARQUET_AND_DATASET_INFO_BLOCKED_DATASETS: ${PARQUET_AND_DATASET_INFO_BLOCKED_DATASETS-} PARQUET_AND_DATASET_INFO_COMMIT_MESSAGE: ${PARQUET_AND_DATASET_INFO_COMMIT_MESSAGE-Update parquet files} PARQUET_AND_DATASET_INFO_COMMITTER_HF_TOKEN: ${PARQUET_AND_DATASET_INFO_COMMITTER_HF_TOKEN-} @@ -181,14 +176,13 @@ services: build: context: .. dockerfile: services/worker/Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw extends: file: docker-compose-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/parquet" # hard-coded + WORKER_ENDPOINT: "/parquet" # hard-coded depends_on: - mongodb restart: always @@ -196,14 +190,13 @@ services: build: context: .. dockerfile: services/worker/Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw extends: file: docker-compose-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/dataset-info" # hard-coded + WORKER_ENDPOINT: "/dataset-info" # hard-coded depends_on: - mongodb restart: always @@ -211,14 +204,13 @@ services: build: context: .. dockerfile: services/worker/Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw extends: file: docker-compose-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/sizes" # hard-coded + WORKER_ENDPOINT: "/sizes" # hard-coded depends_on: - mongodb restart: always diff --git a/tools/docker-compose-dev-base.yml b/tools/docker-compose-dev-base.yml index 4b488ce97e..617997bb78 100644 --- a/tools/docker-compose-dev-base.yml +++ b/tools/docker-compose-dev-base.yml @@ -16,17 +16,17 @@ services: QUEUE_MONGO_URL: ${QUEUE_MONGO_URL-mongodb://mongodb} # use mongo container by default QUEUE_MONGO_DATABASE: ${QUEUE_MONGO_DATABASE-datasets_server_queue} # worker - WORKER_LOOP_MAX_DISK_USAGE_PCT: ${WORKER_LOOP_MAX_DISK_USAGE_PCT-90} - WORKER_LOOP_MAX_LOAD_PCT: ${WORKER_LOOP_MAX_LOAD_PCT-70} - WORKER_LOOP_MAX_MEMORY_PCT: ${WORKER_LOOP_MAX_MEMORY_PCT-80} - WORKER_LOOP_SLEEP_SECONDS: ${WORKER_LOOP_SLEEP_SECONDS-15} + WORKER_CONTENT_MAX_BYTES: ${WORKER_CONTENT_MAX_BYTES-10_000_000} + WORKER_MAX_DISK_USAGE_PCT: ${WORKER_MAX_DISK_USAGE_PCT-90} + WORKER_MAX_LOAD_PCT: ${WORKER_MAX_LOAD_PCT-70} + WORKER_MAX_MEMORY_PCT: ${WORKER_MAX_MEMORY_PCT-80} + WORKER_SLEEP_SECONDS: ${WORKER_SLEEP_SECONDS-15} datasets-worker: extends: service: common environment: # datasets DATASETS_BASED_HF_DATASETS_CACHE: ${HF_DATASETS_CACHE-/datasets-cache} - DATASETS_BASED_CONTENT_MAX_BYTES: ${DATASETS_BASED_CONTENT_MAX_BYTES-10_000_000} HF_MODULES_CACHE: ${HF_DATASETS_CACHE-/modules-cache} NUMBA_CACHE_DIR: ${NUMBA_CACHE_DIR-/numba-cache} # volumes to local source directory for development diff --git a/tools/docker-compose-dev-datasets-server.yml b/tools/docker-compose-dev-datasets-server.yml index 208b0a4d5f..d8cb0633f7 100644 --- a/tools/docker-compose-dev-datasets-server.yml +++ b/tools/docker-compose-dev-datasets-server.yml @@ -74,7 +74,6 @@ services: build: context: .. dockerfile: services/worker/dev.Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - splits-datasets-cache:${HF_DATASETS_CACHE-/datasets-cache}:rw @@ -84,7 +83,7 @@ services: file: docker-compose-dev-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/config-names" # hard-coded + WORKER_ENDPOINT: "/config-names" # hard-coded depends_on: - mongodb restart: always @@ -92,7 +91,6 @@ services: build: context: .. dockerfile: services/worker/dev.Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - splits-datasets-cache:${HF_DATASETS_CACHE-/datasets-cache}:rw @@ -102,7 +100,7 @@ services: file: docker-compose-dev-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/split-names" # hard-coded + WORKER_ENDPOINT: "/split-names" # hard-coded depends_on: - mongodb restart: always @@ -110,7 +108,6 @@ services: build: context: .. dockerfile: services/worker/dev.Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - splits-datasets-cache:${HF_DATASETS_CACHE-/datasets-cache}:rw @@ -120,7 +117,7 @@ services: file: docker-compose-dev-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/splits" # hard-coded + WORKER_ENDPOINT: "/splits" # hard-coded depends_on: - mongodb restart: always @@ -128,7 +125,6 @@ services: build: context: .. dockerfile: services/worker/dev.Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - first-rows-datasets-cache:${HF_DATASETS_CACHE-/datasets-cache}:rw @@ -140,13 +136,13 @@ services: environment: ASSETS_BASE_URL: "http://localhost:${PORT_REVERSE_PROXY-8000}/assets" # hard-coded to work with the reverse-proxy ASSETS_STORAGE_DIRECTORY: ${ASSETS_STORAGE_DIRECTORY-/assets} - DATASETS_BASED_ENDPOINT: "/first-rows" # hard-coded + WORKER_ENDPOINT: "/first-rows" # hard-coded FIRST_ROWS_MAX_BYTES: ${FIRST_ROWS_MAX_BYTES-1_000_000} FIRST_ROWS_MAX_NUMBER: ${FIRST_ROWS_MAX_NUMBER-100} FIRST_ROWS_MIN_CELL_BYTES: ${FIRST_ROWS_MIN_CELL_BYTES-100} FIRST_ROWS_MIN_NUMBER: ${FIRST_ROWS_MIN_NUMBER-10} FIRST_ROWS_COLUMNS_MAX_NUMBER: ${FIRST_ROWS_COLUMNS_MAX_NUMBER-1_000} - WORKER_LOOP_STORAGE_PATHS: ${ASSETS_STORAGE_DIRECTORY-/assets} + WORKER_STORAGE_PATHS: ${ASSETS_STORAGE_DIRECTORY-/assets} # ^ note: the datasets cache is automatically added, so no need to add it here depends_on: - mongodb @@ -155,7 +151,6 @@ services: build: context: .. dockerfile: services/worker/dev.Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw - parquet-datasets-cache:${HF_DATASETS_CACHE-/datasets-cache}:rw @@ -165,7 +160,7 @@ services: file: docker-compose-dev-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/parquet-and-dataset-info" # hard-coded + WORKER_ENDPOINT: "/parquet-and-dataset-info" # hard-coded PARQUET_AND_DATASET_INFO_BLOCKED_DATASETS: ${PARQUET_AND_DATASET_INFO_BLOCKED_DATASETS-} PARQUET_AND_DATASET_INFO_COMMIT_MESSAGE: ${PARQUET_AND_DATASET_INFO_COMMIT_MESSAGE-Update parquet files} PARQUET_AND_DATASET_INFO_COMMITTER_HF_TOKEN: ${PARQUET_AND_DATASET_INFO_COMMITTER_HF_TOKEN-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} @@ -181,14 +176,13 @@ services: build: context: .. dockerfile: services/worker/dev.Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw extends: file: docker-compose-dev-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/parquet" # hard-coded + WORKER_ENDPOINT: "/parquet" # hard-coded depends_on: - mongodb restart: always @@ -196,14 +190,13 @@ services: build: context: .. dockerfile: services/worker/dev.Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw extends: file: docker-compose-dev-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/dataset-info" # hard-coded + WORKER_ENDPOINT: "/dataset-info" # hard-coded depends_on: - mongodb restart: always @@ -211,14 +204,13 @@ services: build: context: .. dockerfile: services/worker/dev.Dockerfile - # image: ${IMAGE_WORKER_DATASETS_BASED?IMAGE_WORKER_DATASETS_BASED env var must be provided} volumes: - assets:${ASSETS_STORAGE_DIRECTORY-/assets}:rw extends: file: docker-compose-dev-base.yml service: datasets-worker environment: - DATASETS_BASED_ENDPOINT: "/sizes" # hard-coded + WORKER_ENDPOINT: "/sizes" # hard-coded depends_on: - mongodb restart: always