Skip to content

Commit

Permalink
fix(worker): Pass configured tolerations to AsyncOrchestratorPodProcess
Browse files Browse the repository at this point in the history
The previous implementation of AsyncOrchestratorPodProcess did not
account for tolerations that are configured for the worker pods; so if a
nodeSelector was configured to isolate pods into their own private node
group, the lack of a toleration could make these pods unschedulable
since they would not tolerate the node taint on such a node group.
  • Loading branch information
justenwalker committed Dec 6, 2023
1 parent 7fc2209 commit 0a5d271
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.TolerationPOJO;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.workers.storage.DocumentStoreClient;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
Expand All @@ -17,6 +18,8 @@
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.SecretVolumeSourceBuilder;
import io.fabric8.kubernetes.api.model.StatusDetails;
import io.fabric8.kubernetes.api.model.Toleration;
import io.fabric8.kubernetes.api.model.TolerationBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMount;
Expand Down Expand Up @@ -372,7 +375,8 @@ public void create(final Map<String, String> allLabels,
final ResourceRequirements resourceRequirements,
final Map<String, String> fileMap,
final Map<Integer, Integer> portMap,
final Map<String, String> nodeSelectors) {
final Map<String, String> nodeSelectors,
final List<TolerationPOJO> tolerations) {
final List<Volume> volumes = new ArrayList<>();
final List<VolumeMount> volumeMounts = new ArrayList<>();
final List<EnvVar> envVars = new ArrayList<>();
Expand Down Expand Up @@ -482,6 +486,7 @@ public void create(final Map<String, String> allLabels,
.withInitContainers(initContainer)
.withVolumes(volumes)
.withNodeSelector(nodeSelectors)
.withTolerations(buildPodTolerations(tolerations))
.endSpec()
.build();

Expand Down Expand Up @@ -521,6 +526,19 @@ public void create(final Map<String, String> allLabels,
copyFilesToKubeConfigVolumeMain(createdPod, updatedFileMap);
}

private Toleration[] buildPodTolerations(final List<TolerationPOJO> tolerations) {
if (tolerations == null || tolerations.isEmpty()) {
return null;
}
return tolerations.stream().map(workerPodToleration -> new TolerationBuilder()
.withKey(workerPodToleration.getKey())
.withEffect(workerPodToleration.getEffect())
.withOperator(workerPodToleration.getOperator())
.withValue(workerPodToleration.getValue())
.build())
.toArray(Toleration[]::new);
}

private static void copyFilesToKubeConfigVolumeMain(final Pod podDefinition, final Map<String, String> files) {
final List<Map.Entry<String, String>> fileEntries = new ArrayList<>(files.entrySet());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,15 @@ public OUTPUT run(final INPUT input, final Path jobRoot) throws WorkerException
final var nodeSelectors =
isCustomConnector ? workerConfigs.getWorkerIsolatedKubeNodeSelectors().orElse(workerConfigs.getworkerKubeNodeSelectors())
: workerConfigs.getworkerKubeNodeSelectors();

final var tolerations = workerConfigs.getWorkerKubeTolerations();
try {
process.create(
allLabels,
resourceRequirements,
fileMap,
portMap,
nodeSelectors);
nodeSelectors,
tolerations);
} catch (final KubernetesClientException e) {
ApmTraceUtils.addExceptionToTrace(e);
throw new WorkerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ void testAsyncOrchestratorPodProcess(final String pullPolicy) throws Interrupted

asyncProcess.create(Map.of(), new WorkerConfigs(new EnvConfigs()).getResourceRequirements(), Map.of(
OrchestratorConstants.INIT_FILE_APPLICATION, AsyncOrchestratorPodProcess.NO_OP,
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap, workerConfigs.getworkerKubeNodeSelectors());
OrchestratorConstants.INIT_FILE_ENV_MAP, Jsons.serialize(envMap)), portMap, workerConfigs.getworkerKubeNodeSelectors(),
workerConfigs.getWorkerKubeTolerations());

// a final activity waits until there is output from the kube pod process
asyncProcess.waitFor(10, TimeUnit.SECONDS);
Expand Down

0 comments on commit 0a5d271

Please sign in to comment.