Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-2174] GoT YarnService Integration with DynamicScaling #4077

Merged
merged 23 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
8b8bb5a
Define Gobblin-on-Temporal`WorkforcePlan` and dynamic `ScalingDirecti…
phet Oct 18, 2024
8eb3d4e
change package name from `dynscale` to `dynamic` and add a few comments
phet Oct 21, 2024
a788a8f
initial changes for yarn integration for dynamic scaling
Blazer-007 Nov 13, 2024
f1c859c
added worker profile config to be used while starting container
Blazer-007 Nov 18, 2024
7e7113c
removed getter
Blazer-007 Nov 18, 2024
0528f71
addressed few comments
Blazer-007 Nov 19, 2024
8a51f6c
adding dummy scaling directive source for testing
Blazer-007 Nov 23, 2024
64eb6e2
adding changes that was removed during rebase
Blazer-007 Nov 23, 2024
7094023
addressed few review comments
Blazer-007 Nov 28, 2024
9ccf8ae
changed initial container request to use baseline worker profile
Blazer-007 Nov 28, 2024
606349e
added abstractdynamicscalingmanger
Blazer-007 Nov 29, 2024
0c87a75
fix typo and added one extra log line while launching initial containers
Blazer-007 Nov 29, 2024
c7a6d1a
added unit test for buildcontainercommand
Blazer-007 Nov 29, 2024
3883837
added unit test for baselineworkerprofilecreation
Blazer-007 Nov 29, 2024
c478eb5
added unit tests for runnable
Blazer-007 Nov 29, 2024
15396fc
refactored tests
Blazer-007 Nov 30, 2024
e511968
minor comment update
Blazer-007 Dec 1, 2024
88cd732
addressed few comments
Blazer-007 Dec 5, 2024
b593ca5
fixed yarn service test
Blazer-007 Dec 6, 2024
fbd4890
added DynamicScalingYarnServiceTest
Blazer-007 Dec 6, 2024
ae1fb4c
removed unused line
Blazer-007 Dec 6, 2024
4503325
use renderName while printing profile name
Blazer-007 Dec 7, 2024
2e78702
corrected tests
Blazer-007 Dec 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,9 @@ public interface GobblinTemporalConfigurationKeys {
String TEMPORAL_NUM_WORKERS_PER_CONTAINER = PREFIX + "num.workers.per.container";
int DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS = 1;
String TEMPORAL_CONNECTION_STRING = PREFIX + "connection.string";

/**
* Prefix for Gobblin-on-Temporal Dynamic Scaling
*/
String DYNAMIC_SCALING_PREFIX = PREFIX + "dynamic.scaling.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,23 @@
package org.apache.gobblin.temporal.dynamic;

import com.typesafe.config.Config;
import lombok.AllArgsConstructor;
import lombok.Data;


/** A named worker {@link Config} */
@Data
@AllArgsConstructor
public class WorkerProfile {
private final String name;
private final Config config;

/**
* Constructs a `WorkerProfile` with the baseline name and the specified configuration.
*
* @param config the configuration for the worker profile
*/
public WorkerProfile(Config config) {
this(WorkforceProfiles.BASELINE_NAME, config);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.yarn;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.apache.commons.collections.CollectionUtils;

import com.typesafe.config.Config;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.AbstractIdleService;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;

/**
* This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing
* the latest scaling directives to the {@link DynamicScalingYarnService} for processing.
*
* This is an abstract class that provides the basic functionality for managing dynamic scaling. Subclasses should implement
* {@link #createScalingDirectiveSource()} to provide a {@link ScalingDirectiveSource} that will be used to get scaling directives.
*
* The actual implemented class needs to be passed as value of config {@link org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES}
*/
@Slf4j
public abstract class AbstractDynamicScalingYarnServiceManager extends AbstractIdleService {

protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
protected final Config config;
private final DynamicScalingYarnService dynamicScalingYarnService;
private final ScheduledExecutorService dynamicScalingExecutor;

public AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) {
this.config = appMaster.getConfig();
if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService();
} else {
String errorMsg = "Failure while getting YarnService Instance from GobblinTemporalApplicationMaster::get_yarnService()"
+ " YarnService {" + appMaster.get_yarnService().getClass().getName() + "} is not an instance of DynamicScalingYarnService";
log.error(errorMsg);
throw new RuntimeException(errorMsg);
}
this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log),
Optional.of("DynamicScalingExecutor")));
}

@Override
protected void startUp() {
int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL,
DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
log.info("Starting the {} with re-scaling interval of {} seconds", this.getClass().getSimpleName(), scheduleInterval);

this.dynamicScalingExecutor.scheduleAtFixedRate(
new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, createScalingDirectiveSource()),
scheduleInterval, scheduleInterval, TimeUnit.SECONDS
);
}

@Override
protected void shutDown() {
log.info("Stopping the " + this.getClass().getSimpleName());
ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, Optional.of(log));
}

/**
* Create a {@link ScalingDirectiveSource} to use for getting scaling directives.
*/
protected abstract ScalingDirectiveSource createScalingDirectiveSource();

/**
* A {@link Runnable} that gets the scaling directives from the {@link ScalingDirectiveSource} and passes them to the
* {@link DynamicScalingYarnService} for processing.
*/
@AllArgsConstructor
static class GetScalingDirectivesRunnable implements Runnable {
private final DynamicScalingYarnService dynamicScalingYarnService;
private final ScalingDirectiveSource scalingDirectiveSource;

@Override
public void run() {
try {
List<ScalingDirective> scalingDirectives = scalingDirectiveSource.getScalingDirectives();
if (CollectionUtils.isNotEmpty(scalingDirectives)) {
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
}
} catch (IOException e) {
log.error("Failed to get scaling directives", e);
} catch (Throwable t) {
log.error("Unexpected error with dynamic scaling via directives", t);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.yarn;

import java.util.List;

import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import com.google.common.eventbus.EventBus;
import com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.StaffingDeltas;
import org.apache.gobblin.temporal.dynamic.WorkerProfile;
import org.apache.gobblin.temporal.dynamic.WorkforcePlan;
import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
import org.apache.gobblin.temporal.dynamic.WorkforceStaffing;
import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;

/**
* Service for dynamically scaling Gobblin containers running on YARN.
* This service manages workforce staffing and plans, and requests new containers as needed.
*/
@Slf4j
public class DynamicScalingYarnService extends YarnService {

/** this holds the current count of containers already requested for each worker profile */
private final WorkforceStaffing actualWorkforceStaffing;
/** this holds the current total workforce plan as per latest received scaling directives */
private final WorkforcePlan workforcePlan;

public DynamicScalingYarnService(Config config, String applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception {
super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus);

this.actualWorkforceStaffing = WorkforceStaffing.initialize(0);
this.workforcePlan = new WorkforcePlan(this.config, this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY));
}

@Override
protected synchronized void requestInitialContainers() {
StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}

/**
* Revises the workforce plan and requests new containers based on the given scaling directives.
*
* @param scalingDirectives the list of scaling directives
*/
public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> scalingDirectives) {
if (CollectionUtils.isEmpty(scalingDirectives)) {
return;
}
this.workforcePlan.reviseWhenNewer(scalingDirectives);
StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing);
requestNewContainersForStaffingDeltas(deltas);
}

private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) {
deltas.getPerProfileDeltas().forEach(profileDelta -> {
if (profileDelta.getDelta() > 0) { // scale up!
WorkerProfile workerProfile = profileDelta.getProfile();
String profileName = workerProfile.getName();
int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0);
int delta = profileDelta.getDelta();
log.info("Requesting {} new containers for profile {} having currently {} containers", delta,
WorkforceProfiles.renderName(profileName), currNumContainers);
requestContainersForWorkerProfile(workerProfile, delta);
// update our staffing after requesting new containers
this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis());
} else if (profileDelta.getDelta() < 0) { // scale down!
// TODO: Decide how to handle negative deltas
log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ",
profileDelta.getProfile().getName(), profileDelta.getDelta());
} // else, already at staffing plan (or at least have requested, so in-progress)
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.temporal.yarn;

import java.util.Optional;

import org.apache.hadoop.fs.FileSystem;

import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;

/**
* {@link FsScalingDirectiveSource} based implementation of {@link AbstractDynamicScalingYarnServiceManager}.
*/
public class FsSourceDynamicScalingYarnServiceManager extends AbstractDynamicScalingYarnServiceManager {
// TODO: replace fetching of these configs using a new method similar to JobStateUtils::getWorkDirRoot
public final static String DYNAMIC_SCALING_DIRECTIVES_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir";
public final static String DYNAMIC_SCALING_ERRORS_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir";
Comment on lines +33 to +34
Copy link
Contributor

@phet phet Dec 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are fine for now, but I suspect we'll move to basing these directories from the JobStateUtils::getWorkDirRoot

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, will move it.
added a todo for reminder

private final FileSystem fs;

public FsSourceDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) {
super(appMaster);
this.fs = appMaster.getFs();
}

@Override
protected ScalingDirectiveSource createScalingDirectiveSource() {
return new FsScalingDirectiveSource(
this.fs,
this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public GobblinTemporalApplicationMaster(String applicationName, String applicati
protected YarnService buildTemporalYarnService(Config config, String applicationName, String applicationId,
YarnConfiguration yarnConfiguration, FileSystem fs)
throws Exception {
return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus);
return new DynamicScalingYarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus);
}

/**
Expand Down
Loading
Loading