Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Support task-level parallelism #71

Closed
irvinlim opened this issue Apr 30, 2022 · 1 comment · Fixed by #81 or #87
Closed

Proposal: Support task-level parallelism #71

irvinlim opened this issue Apr 30, 2022 · 1 comment · Fixed by #81 or #87
Labels
area/workloads Related to workload execution (e.g. jobs, tasks) component/execution Issues or PRs related exclusively to the Execution component (Job, JobConfig) kind/proposal Proposal for new ideas or features
Milestone

Comments

@irvinlim
Copy link
Member

irvinlim commented Apr 30, 2022

Motivation

Users may wish to shard their periodic jobs into multiple Pods. For example every day at 12am, we will need to process a whole bunch of work, and this work to be done may significantly increase in volume over time, and the work cannot be done before 12am (i.e. on the previous day). At where we stand right now, the only option is to support vertical scaling of a Pod, which is obviously impractical beyond a certain point. As such, we want to evaluate how to support horizontal scaling of Job pods (i.e. task-level parallelism).

The Job object in K8s currently supports parallel execution: https://kubernetes.io/docs/concepts/workloads/controllers/job/#parallel-jobs. However, it is my personal opinion that the K8s JobController API for controlling parallelism of multiple Pods is not very clear and well-designed. We will outline the use cases, and attempt to propose an API design that would support and potentially improve the existing one in K8s.

It is also important to avoid over-designing this feature. A good principle to keep in mind is that Furiko's main focus on automatic timed/periodic tasks, not so much user-invoked or complex task workflows, and we are better off utilizing more complex workflow engines (i.e. Argo Workflows) to support them.

Use Cases

We will outline some use cases (including/highlighting those we have gotten internally):

  1. Support running a fixed number of Pods per Job at the same time: This would basically be equivalent to "scaling up" a Job to more replicas, but each Pod has to be assigned work independently of other Pods. This usually involves an external message queue.
    • The equivalent use case in K8s is "Parallel Jobs with a work queue".
    • The advantage is that there is a well-defined upper bound in the amount of resources required to run the Job, but the disadvantage is that any unexpected increase in work to be done could result in longer processing times.
  2. Support running a variable number of Pods per Job at the same time: This is an extension of (1), except that the parallelism factor is variable.
    • One idea could be to use (1) but allow changing the parallelism at both adhoc invocation time (prior to creation), and during runtime (after it is started) which could be controlled by an autoscaler of sorts. See (3) for more details on the latter.
    • If we allow the parallelism factor to depend on an external data source (e.g. Kafka topic lag), then it becomes dangerously close to a MapReduce design. I think it may be better to require the parallelism factor to be defined in the workload spec itself.
  3. Horizontal scaling of Pods while a Job is running: While a Job is running, we may want to update the number of Pod replicas if we realize that it may not complete in time without stopping its progress. (just an idea, no real use case for now)
    • This can be utilized by jobs which read from a queue with a central coordinator, but not so much when the number of shards is fixed. One notable exception is Kafka, where consumers can rebalance when new consumers are added to the consumer group, and scale up to a maximum of the number of partitions.
    • Implementing this should be straightforward, but we have to be careful about the scale-down case, since it may conflict with completion policies (see below). A simple way to get around this is to prevent scale-down, and only allow scale-up.
  4. Stateless/Stateful parallel worker patterns: When a Job has multiple parallel Pods, it could be possible that some Pods can pick up work from a queue such that other Pods don't need to do so, so it would be sufficient to terminate once any Pod is finished. On the other hand, if every Pod works on its subset or work and nothing else (e.g. using consistent hashing), then we need to wait for ALL Pods to finish before terminating. As such, we need to support both use cases.

I personally don't think there is a need for "fixed completion count" jobs like in K8s, at least I have never encountered a use case which depends on this. Perhaps the close equivalent of "fixed completion count" is to start N Jobs at the same time with a fixed concurrency factor, which is slightly different from the topic we are currently discussing.

Requirements

  1. The parallelism feature must not conflict with the retries feature of Jobs. In other words, the distinction between retries and parallel tasks should be clear. In the batch/v1 JobController, it depends on a Pod's restartPolicy to retry containers, but we explicitly support pod-level retries.
  2. Control the completion and success policy: The user may want to be explicit about what and when constitutes a successful or a completed Job. In the case of a single Pod, using exit code 0 (i.e. the Pod's phase should be Success) is sufficient to indicate a successful Job, but it becomes less clear once we have parallel Pods.
  3. Control early termination policies: When a single Pod fails, it could be possible that we want to immediately terminate early from the Job in order to avoid unnecessary work, or to wait for all Pods to gracefully finish their existing work.

Proposed Implementation 1

We are not going forward with this proposal.

Show old proposal...

We will implement a new CRD ShardedSet (NOTE: the name is currently TBC). This CRD is most similar to a ReplicaSet, except that it controls running to completion (which ironically actually makes it similar to batch/v1 Job itself).

The implementation of a ShardedSet will follow closely to the Indexed Job pattern in batch/v1 (https://kubernetes.io/docs/tasks/job/indexed-parallel-processing-static/), but defines completion policy in a much more explicit manner than is currently supported by the batch/v1 Job API, and avoids the confusion with having to define completions. See kubernetes/kubernetes#28486 for some related discussion about how completions are handled.

CRD Design

Example of a proposed ShardedSet custom resource object, with all possible API fields (including future expansion):

apiVersion: execution.furiko.io/v1alpha1
kind: ShardedSet
spec:
  # Defines that exactly 5 tasks are run in parallel.
  # Each task receives a separate task index, and it is guaranteed that
  # no two concurrently running tasks will receive the same task index.
  parallelism: 5

  # Defines retry policy.
  retries:
    # Maximum attempts per shard, beyond which we stop creating new tasks for the shard. Defaults to 1.
    maxAttemptsPerShard: 3

    # Cannot exceed maxAttemptsPerShard * parallelism (also the default).
    # If a shard fails but this is exceeded, then it is considered a shard failure.
    maxAttemptsTotal: 15

  # Defines completion policy.
  completion:
    # Defines when a ShardedSet is completed. Options:
    #  - OnAllShardsSuccess (default): Stop once all shards are successful.
    #  - OnAnyShardSuccess: Stop once any shard is successful.
    #  - OnAnyShardFailure: Stop once any shard is failed.
    condition: OnAllShardsSuccess

    # Defines what to do on completion, depending on whether the ShardedSet is successful or failed, the defaults are shown below.
    # Note that this has no effect for OnAllShardsSuccess, since by definition all shards would have completed prior to taking this action.
    onSuccess: WaitForRemaining
    onFailure: TerminateRemaining

  # The TaskTemplateSpec itself, we could further compose other task executors too!
  template: 
    pod:
      spec:
        containers: 
          - name: container
            image: alpine
            args: ["echo", "Hello world"]
            env:
              # The task can determine its shard index using this env var.
              - name: SHARD_INDEX
                value: "${parallel.shard_index}"

Complete breakdown for completion.condition success cases:

  • OnAllShardsSuccess
    • When some shard succeeds, do nothing.
    • When all shards succeed, succeed the ShardedSet.
  • OnAnyShardSuccess
    • When some shard succeeds, immediately succeed the ShardedSet.
  • OnAnyShardFailure
    • When some shard succeeds, do nothing.
    • When all shards succeed, succeed the ShardedSet.

Complete breakdown for completion.condition failure cases:

  • OnAllShardsSuccess
    • If any shard cannot retry further (exceed maxAttempts), immediately fail the ShardedSet.
  • OnAnyShardSuccess
    • If any shard cannot retry further (exceed maxAttempts), do nothing.
    • If all shards failed and cannot retry further (exceed maxAttempts), fail the ShardedSet.
  • OnAnyShardFailure
    • If any shard cannot retry further (exceed maxAttempts), immediately fail the ShardedSet.

Note that:

  • In the success case, OnAllShardsSuccess == OnAnyShardFailure
  • In the failure case: OnAllShardsSuccess == OnAnyShardFailure

Therefore, we can simplify it to just AllShardsSucceeded and AnyShardSucceeded. (Help me verify this claim??)

Inside a JobConfig, the user will define it like this:

apiVersion: execution.furiko.io/v1alpha1
kind: JobConfig
spec:
  template:
    spec:
      # Retry the ShardedSet up to 3 times
      maxAttempts: 3
      task:
        template:
          # This is the start of the ShardedSetTemplateSpec
          parallel:
            metadata:
              labels: ...
            spec:
              parallelism: 5
              template:
                pod:
                  spec:
                    containers: [ ... ]

Pros and Cons

  • Pros:
    • Very easy to reason about. The composition of two separate APIs is clear from a developer and a user perspective, and future extensions to the ShardedSet controller avoids conflicting with the core JobController.
    • Most users will not need to think about the additional API fields that are introduced for parallelism if they don't need it. In my opinion, this is the biggest issue with the batch/v1 Job.
  • Cons:
    • By composing a secondary task executor to achieve task-level parallelism, we may be prematurely confining the design to only support a subset of use cases. For example, by separating the retry and parallel sets into distinct layers we may constrain the possible expansion options in the future.
    • Additional implementation cost, but it is saved by reducing the work on ensuring that the existing JobController behavior is not broken if we do Option 2.
    • Potentially duplicate logic in both ShardedSet and Job controllers (e.g. task adoption, task executor).

Proposed Implementation 2

Another way is to avoid the use of a CRD, but implement it directly in the JobController and update the Job API.

We will add the following fields to the spec:

spec:
  taskTemplate:
    parallelism:
      withCount: 3
      completionStrategy: AllSuccessful

Some possible parallelism types:

  • withCount: Specify absolute number, the index number will be generated from 0 - N-1 in ${task.index_num}
  • withKeys: Specify a list of string keys, it will be made available in ${task.index_key}
  • withMatrix: Specify a map of keys to a list of string values, each key will be available in ${task.index_matrix.<matrix_key>}. This is to support common parallel patterns (e.g. CI on multiple platform and version combinations)

Some considerations:

  1. Retries will take place on a parallel index-level. This means that using withCount of 3, each index (0, 1, 2) has a maximum of 3 retries each.
  2. The completionStrategy is similar to Proposal (1).

The main reason we are not using Proposal (1) is because of the complexity introduced when having nested retries, and it is actually clearer to inline the implementation into the same CRD/controller.

Alternatives

There are some alternatives to the above design to achieve the same requirements.

  1. Creating one JobConfig for each desired shard. The obvious downside is that you have duplicate objects and higher cost of maintenance, configuration drift, etc.
  2. Support starting multiple Jobs at each schedule. This is a very simple solution, but there are some drawbacks:
    • Each Job started at the same time are basically independent of each other, and we cannot determine the status or control the workload as a single atomic unit.
    • Multiple Jobs started concurrently that spill over their normal run duration may eat into the maxConcurrency of the JobConfig (see Feature: Support maxConcurrency #16), resulting in lesser total Jobs being run than expected.

TODO List

@irvinlim irvinlim added kind/proposal Proposal for new ideas or features component/execution Issues or PRs related exclusively to the Execution component (Job, JobConfig) area/workloads Related to workload execution (e.g. jobs, tasks) labels Apr 30, 2022
@irvinlim
Copy link
Member Author

irvinlim commented May 3, 2022

We will be adding this to the Alpha Release milestone, and this will be second task executor to be added to Furiko. I think that we can solidify the task executor interface once this is complete.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/workloads Related to workload execution (e.g. jobs, tasks) component/execution Issues or PRs related exclusively to the Execution component (Job, JobConfig) kind/proposal Proposal for new ideas or features
Projects
None yet
1 participant