Skip to content

Commit

Permalink
Add limitations.
Browse files Browse the repository at this point in the history
  • Loading branch information
wanglijie95 committed Mar 16, 2022
1 parent 521d091 commit 5a93c1c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
10 changes: 6 additions & 4 deletions docs/content.zh/docs/deployment/elastic_scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ Adaptive 调度器可以通过[所有在名字包含 `adaptive-scheduler` 的配

## Adaptive Batch Scheduler

Adaptive Batch Scheduler 是一种可以自动推导每个算子并行度的批作业调度器。如果算子未设置并行度,调度器将根据其消费的数据量的大小来推导其并行度。这可以带来诸多好处:
Adaptive Batch Scheduler 是一种可以自动推导每个算子并行度的批作业调度器。如果算子未设置并行度,调度器将根据其消费的数据量的大小来推导其并行度(目前推导的并行度只会是 2^N, 细节详见 ["推导的并行度只能是 2 的幂次"](#局限性-2)。这可以带来诸多好处:
- 批作业用户可以从并行度调优中解脱出来
- 根据数据量自动推导并行度可以更好地适应每天变化的数据量
- SQL作业中的算子也可以分配不同的并行度
Expand All @@ -168,9 +168,9 @@ Adaptive Batch Scheduler 是一种可以自动推导每个算子并行度的批
- 由于 ["只支持所有数据交换都为 BLOCKING 模式的作业"](#局限性-2), 需要将 [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 配置为 `ALL-EXCHANGES-BLOCKING`(默认值) 。

除此之外,使用 Adaptive Batch Scheduler 时,以下相关配置也可以调整:
- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): 允许自动设置的并行度最小值
- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): 允许自动设置的并行度最大值
- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): 期望每个任务平均处理的数据量大小
- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): 允许自动设置的并行度最小值。需要配置为 2^N,否则也会被自动调整为 2^N。
- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): 允许自动设置的并行度最大值。需要配置为 2^N,否则也会被自动调整为 2^N。
- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): 期望每个任务平均处理的数据量大小。由于顶点的并行度会被调整为 2^N,因此实际每个任务平均处理的数据量大小将是该值的 0.75~1.5 倍。 另外需要注意的是,当出现数据倾斜,或者确定的并行度达到最大并行度(由于数据过多)时,一些任务实际处理的数据可能会远远超过这个值。
- [`jobmanager.adaptive-batch-scheduler.default-source-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-default-source-parallelism): source 算子的默认并行度

#### 配置算子的并行度为 `-1`
Expand All @@ -188,6 +188,8 @@ Adaptive Batch Scheduler 只会为用户未指定并行度的算子(并行度
### 局限性
- **只支持批作业**: Adaptive Batch Scheduler 只支持批作业。当提交的是一个流作业时,会抛出异常。
- **只支持所有数据交换都为 BLOCKING 模式的作业**: 目前 Adaptive Batch Scheduler 只支持 [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) 为 ALL-EXCHANGES-BLOCKING 的作业。
- **推导的并行度只能是 2 的幂次**: 为了使子分区可以均匀分配给下游任务,用户需要将 [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism) 配置为 2^N, 推导出的并行度会是 2^M, 且满足 M < N。
- **不支持一些文件操作 API**: 不支持 `StreamExecutionEnvironment#readFile` `StreamExecutionEnvironment#readTextFile` `StreamExecutionEnvironment#createInput(FileInputFormat, ...)` 和所有使用了这些 API 的 source. 当使用了这些 API 时,会有一个独立的监控任务 (`Custom File Source`) 在真正的 source 前,Adaptive Batch Scheduler 无法处理这种情况。
- **Web UI 上展示的上游输出的数据量和下游收到的数据量可能不一致**: 在使用 Adaptive Batch Scheduler 时,对于 broadcast 边,上游算子发送的数据量和下游算子接收的数据量可能会不相等,这在 Web UI 的显示上可能会困扰用户。细节详见 [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler)

{{< top >}}
10 changes: 6 additions & 4 deletions docs/content/docs/deployment/elastic_scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ The behavior of Adaptive Scheduler is configured by [all configuration options c

## Adaptive Batch Scheduler

The Adaptive Batch Scheduler can automatically decide parallelisms of operators for batch jobs. If an operator is not set with a parallelism, the scheduler will decide parallelism for it according to the size of its consumed datasets. This can bring many benefits:
The Adaptive Batch Scheduler can automatically decide parallelisms of operators for batch jobs. If an operator is not set with a parallelism, the scheduler will decide parallelism for it according to the size of its consumed datasets (Note that the decided parallelism can only be a power of 2, see ["The decided parallelism can only be a power of 2"](#limitations-2) for details). This can bring many benefits:
- Batch job users can be relieved from parallelism tuning
- Automatically tuned parallelisms can better fit consumed datasets which have a varying volume size every day
- Operators from SQL batch jobs can be assigned with different parallelisms which are automatically tuned
Expand All @@ -170,9 +170,9 @@ To use Adaptive Batch Scheduler, you need to:
- Leave the [`execution.batch-shuffle-mode`]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) unset or explicitly set it to `ALL-EXCHANGES-BLOCKING` (default value) due to ["ALL-EXCHANGES-BLOCKING jobs only"](#limitations-2).

In addition, there are several related configuration options that may need adjustment when using Adaptive Batch Scheduler:
- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively
- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively
- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process
- [`jobmanager.adaptive-batch-scheduler.min-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-min-parallelism): The lower bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will also be rounded up to a power of 2 automatically.
- [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism): The upper bound of allowed parallelism to set adaptively. Currently, this option should be configured as a power of 2, otherwise it will also be rounded down to a power of 2 automatically.
- [`jobmanager.adaptive-batch-scheduler.avg-data-volume-per-task`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-avg-data-volume-per-task): The average size of data volume to expect each task instance to process. Note that since the parallelism of the vertices is adjusted to a power of 2, the actual average size will be 0.75~1.5 times this value. It is also important to note that when data skew occurs, or the decided parallelism reaches the max parallelism (due to too much data), the data actually processed by some tasks may far exceed this value.
- [`jobmanager.adaptive-batch-scheduler.default-source-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-default-source-parallelism): The default parallelism of data source.

#### Set the parallelism of operators to `-1`
Expand All @@ -191,6 +191,8 @@ Adaptive Batch Scheduler will only decide parallelism for operators whose parall

- **Batch jobs only**: Adaptive Batch Scheduler only supports batch jobs. Exception will be thrown if a streaming job is submitted.
- **ALL-EXCHANGES-BLOCKING jobs only**: At the moment, Adaptive Batch Scheduler only supports jobs whose [shuffle mode]({{< ref "docs/deployment/config" >}}#execution-batch-shuffle-mode) is `ALL-EXCHANGES-BLOCKING`.
- **The decided parallelism can only be a power of 2**: In order to make the subpartitoins evenly consumed by downstream tasks, user should configure the [`jobmanager.adaptive-batch-scheduler.max-parallelism`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-max-parallelism) to be a power of 2 (2^N), and the decided parallelism will also be a power of 2 (2^M and M < N).
- **No support for serveral file APIs**: No support for `StreamExecutionEnvironment#readFile` `StreamExecutionEnvironment#readTextFile` `StreamExecutionEnvironment#createInput(FileInputFormat, ...)` and all data sources using these APIs. When using these APIs, there will be a separate monitoring task (called a `Custom File Source`) as a predecessor to the actual data sources, which Adaptive Batch Scheduler cannot handle.
- **Inconsistent broadcast results metrics on WebUI**: In Adaptive Batch Scheduler, for broadcast results, the number of bytes/records sent by the upstream task counted by metric is not equal to the number of bytes/records received by the downstream task, which may confuse users when displayed on the Web UI. See [FLIP-187](https://cwiki.apache.org/confluence/display/FLINK/FLIP-187%3A+Adaptive+Batch+Job+Scheduler) for details.

{{< top >}}

0 comments on commit 5a93c1c

Please sign in to comment.