Skip to content
This repository has been archived by the owner on Sep 18, 2024. It is now read-only.

support shared storage for reusable mode #3354

Merged
merged 23 commits into from
Feb 25, 2021
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions examples/trials/mnist-sharedstorage/config_azureblob.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
authorName: default
experimentName: example_mnist
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
trainingServicePlatform: aml
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
#SMAC (SMAC should be installed through nnictl)
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
image: msranni/nni
amlConfig:
subscriptionId: ${replace_to_your_subscriptionId}
resourceGroup: ${replace_to_your_resourceGroup}
workspaceName: ${replace_to_your_workspaceName}
computeTarget: ${replace_to_your_computeTarget}
sharedStorage:
storageType: AzureBlob
localMountPoint: ${your/local/mount/point}
remoteMountPoint: ${your/remote/mount/point}
resourceGroupName: ${replace_to_your_resourceGroupName}
storageAccountName: ${replace_to_your_storageAccountName}
containerName: ${replace_to_your_containerName}
# usermount means you have already mount this storage on localMountPoint
# nnimount means nni will try to mount this storage on localMountPoint
# nomount means storage will not mount in local machine, will support partial storages in the future
localMounted: nnimount
35 changes: 35 additions & 0 deletions examples/trials/mnist-sharedstorage/config_nfs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
authorName: default
experimentName: example_mnist
trialConcurrency: 1
maxExecDuration: 1h
maxTrialNum: 10
trainingServicePlatform: aml
searchSpacePath: search_space.json
#choice: true, false
useAnnotation: false
tuner:
#choice: TPE, Random, Anneal, Evolution, BatchTuner, MetisTuner, GPTuner
#SMAC (SMAC should be installed through nnictl)
builtinTunerName: TPE
classArgs:
#choice: maximize, minimize
optimize_mode: maximize
trial:
command: python3 mnist.py
codeDir: .
image: msranni/nni
amlConfig:
subscriptionId: ${replace_to_your_subscriptionId}
resourceGroup: ${replace_to_your_resourceGroup}
workspaceName: ${replace_to_your_workspaceName}
computeTarget: ${replace_to_your_computeTarget}
sharedStorage:
storageType: NFS
localMountPoint: ${your/local/mount/point}
remoteMountPoint: ${your/remote/mount/point}
nfsServer: ${nfs-server-ip}
exportedDirectory: ${nfs/exported/directory}
# usermount means you have already mount this storage on localMountPoint
# nnimount means nni will try to mount this storage on localMountPoint
# nomount means storage will not mount in local machine, will support partial storages in the future
localMounted: nnimount
166 changes: 166 additions & 0 deletions examples/trials/mnist-sharedstorage/mnist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
"""
A deep MNIST classifier using convolutional layers.

This file is a modification of the official pytorch mnist example:
https://github.com/pytorch/examples/blob/master/mnist/main.py
"""

import os
import argparse
import logging
import nni
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from nni.utils import merge_parameter
from torchvision import datasets, transforms

logger = logging.getLogger('mnist_AutoML')


class Net(nn.Module):
def __init__(self, hidden_size):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 20, 5, 1)
self.conv2 = nn.Conv2d(20, 50, 5, 1)
self.fc1 = nn.Linear(4*4*50, hidden_size)
self.fc2 = nn.Linear(hidden_size, 10)

def forward(self, x):
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(-1, 4*4*50)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return F.log_softmax(x, dim=1)


def train(args, model, device, train_loader, optimizer, epoch):
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
if (args['batch_num'] is not None) and batch_idx >= args['batch_num']:
break
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args['log_interval'] == 0:
logger.info('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))


def test(args, model, device, test_loader):
model.eval()
test_loss = 0
correct = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
# sum up batch loss
test_loss += F.nll_loss(output, target, reduction='sum').item()
# get the index of the max log-probability
pred = output.argmax(dim=1, keepdim=True)
correct += pred.eq(target.view_as(pred)).sum().item()

test_loss /= len(test_loader.dataset)

accuracy = 100. * correct / len(test_loader.dataset)

logger.info('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
test_loss, correct, len(test_loader.dataset), accuracy))

return accuracy


def main(args):
use_cuda = not args['no_cuda'] and torch.cuda.is_available()

torch.manual_seed(args['seed'])

device = torch.device("cuda" if use_cuda else "cpu")

kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}

data_dir = args['data_dir']

train_loader = torch.utils.data.DataLoader(
datasets.MNIST(data_dir, train=True, download=True,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=args['batch_size'], shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
datasets.MNIST(data_dir, train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
])),
batch_size=1000, shuffle=True, **kwargs)

hidden_size = args['hidden_size']

model = Net(hidden_size=hidden_size).to(device)
optimizer = optim.SGD(model.parameters(), lr=args['lr'],
momentum=args['momentum'])

for epoch in range(1, args['epochs'] + 1):
train(args, model, device, train_loader, optimizer, epoch)
test_acc = test(args, model, device, test_loader)

# report intermediate result
nni.report_intermediate_result(test_acc)
logger.debug('test accuracy %g', test_acc)
logger.debug('Pipe send intermediate result done.')

# report final result
nni.report_final_result(test_acc)
logger.debug('Final result is %g', test_acc)
logger.debug('Send final result done.')


def get_params():
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument("--data_dir", type=str,
default='./data', help="data directory")
parser.add_argument('--batch_size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument("--batch_num", type=int, default=None)
parser.add_argument("--hidden_size", type=int, default=512, metavar='N',
help='hidden layer size (default: 512)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
help='SGD momentum (default: 0.5)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--seed', type=int, default=1, metavar='S',
help='random seed (default: 1)')
parser.add_argument('--no_cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--log_interval', type=int, default=1000, metavar='N',
help='how many batches to wait before logging training status')


args, _ = parser.parse_known_args()
return args


if __name__ == '__main__':
try:
# get parameters form tuner
tuner_params = nni.get_next_parameter()
logger.debug(tuner_params)
params = vars(merge_parameter(get_params(), tuner_params))
print(params)
main(params)
except Exception as exception:
logger.exception(exception)
raise
2 changes: 2 additions & 0 deletions examples/trials/mnist-sharedstorage/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
torch
torchvision
6 changes: 6 additions & 0 deletions examples/trials/mnist-sharedstorage/search_space.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"batch_size": {"_type":"choice", "_value": [16, 32, 64, 128]},
"hidden_size":{"_type":"choice","_value":[128, 256, 512, 1024]},
"lr":{"_type":"choice","_value":[0.0001, 0.001, 0.01, 0.1]},
"momentum":{"_type":"uniform","_value":[0, 1]}
}
11 changes: 11 additions & 0 deletions nni/tools/nnictl/config_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ def validate(self, data):
Optional('gpuIndices'): Or(int, And(str, lambda x: len([int(i) for i in x.split(',')]) > 0), error='gpuIndex format error!'),
Optional('maxTrialNumPerGpu'): setType('maxTrialNumPerGpu', int),
Optional('useActiveGpu'): setType('useActiveGpu', bool)
},
Optional('sharedStorage'): {
'storageType': setChoice('storageType', 'NFS', 'AzureBlob'),
Optional('localMountPoint'): setType('localMountPoint', str),
Optional('remoteMountPoint'): setType('remoteMountPoint', str),
Optional('nfsServer'): setType('nfsServer', str),
Optional('storageAccountName'): setType('storageAccountName', str),
Optional('storageAccountKey'): setType('storageAccountKey', str),
Optional('containerName'): setType('containerName', str),
Optional('resourceGroupName'): setType('resourceGroupName', str),
Optional('localMounted'): setChoice('localMounted', 'usermount', 'nnimount', 'nomount')
}
}

Expand Down
15 changes: 15 additions & 0 deletions nni/tools/nnictl/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,19 @@ def set_hybrid_config(experiment_config, port, config_file_name):
#set trial_config
return set_trial_config(experiment_config, port, config_file_name), err_message

def set_shared_storage(experiment_config, port, config_file_name):
if 'sharedStorage' in experiment_config:
response = rest_put(cluster_metadata_url(port), json.dumps({'shared_storage_config': experiment_config['sharedStorage']}), REST_TIME_OUT)
err_message = None
if not response or not response.status_code == 200:
if response is not None:
err_message = response.text
_, stderr_full_path = get_log_path(config_file_name)
with open(stderr_full_path, 'a+') as fout:
fout.write(json.dumps(json.loads(err_message), indent=4, sort_keys=True, separators=(',', ':')))
return False, err_message
return True, None

def set_experiment(experiment_config, mode, port, config_file_name):
'''Call startExperiment (rest POST /experiment) with yaml file content'''
request_data = dict()
Expand Down Expand Up @@ -442,6 +455,8 @@ def set_platform_config(platform, experiment_config, port, config_file_name, res
else:
raise Exception(ERROR_INFO % 'Unsupported platform!')
exit(1)
if config_result:
config_result, err_msg = set_shared_storage(experiment_config, port, config_file_name)
if config_result:
print_normal('Successfully set {0} config!'.format(platform))
else:
Expand Down
3 changes: 2 additions & 1 deletion ts/nni_manager/core/nnimanager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,9 @@ class NNIManager implements Manager {
this.currSubmittedTrialNum++;
this.log.info(`submitTrialJob: form: ${JSON.stringify(form)}`);
const trialJobDetail: TrialJobDetail = await this.trainingService.submitTrialJob(form);
const Snapshot: TrialJobDetail = Object.assign({}, trialJobDetail);
await this.storeExperimentProfile();
this.trialJobs.set(trialJobDetail.id, Object.assign({}, trialJobDetail));
this.trialJobs.set(trialJobDetail.id, Snapshot);
const trialJobDetailSnapshot: TrialJobDetail | undefined = this.trialJobs.get(trialJobDetail.id);
if (trialJobDetailSnapshot != undefined) {
await this.dataStore.storeTrialJobEvent(
Expand Down
12 changes: 12 additions & 0 deletions ts/nni_manager/rest_server/restValidationSchemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,18 @@ export namespace ValidationSchemas {
}),
remote_config: joi.object({ // eslint-disable-line @typescript-eslint/camelcase
reuse: joi.boolean()
}),
shared_storage_config: joi.object({ // eslint-disable-line @typescript-eslint/camelcase
storageType: joi.string(),
localMountPoint: joi.string(),
remoteMountPoint: joi.string(),
nfsServer: joi.string(),
exportedDirectory: joi.string(),
storageAccountName: joi.string(),
storageAccountKey: joi.string(),
containerName: joi.string(),
resourceGroupName: joi.string(),
localMounted: joi.string()
})
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ export enum TrialConfigMetadataKey {
LOG_COLLECTION = 'log_collection',
// Used to set platform for hybrid in reuse mode,
// temproarily change and will refactor config schema in the future
PLATFORM_LIST = 'platform_list'
PLATFORM_LIST = 'platform_list',
SHARED_STORAGE_CONFIG = 'shared_storage_config'
}
2 changes: 2 additions & 0 deletions ts/nni_manager/training_service/reusable/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ export class EnvironmentInformation {

public environmentService?: EnvironmentService;

public useSharedStorage?: boolean;

constructor(id: string, name: string, envId?: string) {
this.log = getLogger();
this.id = id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { AMLClusterConfig, AMLEnvironmentInformation, AMLTrialConfig } from '../
import { EnvironmentInformation, EnvironmentService } from '../environment';
import { EventEmitter } from "events";
import { AMLCommandChannel } from '../channels/amlCommandChannel';
import { SharedStorageService } from '../sharedStorage'


/**
Expand Down Expand Up @@ -114,9 +115,19 @@ export class AMLEnvironmentService extends EnvironmentService {
}
const amlEnvironment: AMLEnvironmentInformation = environment as AMLEnvironmentInformation;
const environmentLocalTempFolder = path.join(this.experimentRootDir, this.experimentId, "environment-temp");
environment.command = `import os\nos.system('mv envs outputs/envs && cd outputs && ${amlEnvironment.command}')`;
environment.useActiveGpu = this.amlClusterConfig.useActiveGpu;
environment.maxTrialNumberPerGpu = this.amlClusterConfig.maxTrialNumPerGpu;
if (!fs.existsSync(environmentLocalTempFolder)) {
await fs.promises.mkdir(environmentLocalTempFolder, {recursive: true});
}
if (amlEnvironment.useSharedStorage) {
const environmentRoot = component.get<SharedStorageService>(SharedStorageService).remoteWorkingRoot;
const remoteMountCommand = component.get<SharedStorageService>(SharedStorageService).remoteMountCommand;
amlEnvironment.command = `${remoteMountCommand} && cd ${environmentRoot} && ${amlEnvironment.command}`.replace(/"/g, `\\"`);
} else {
amlEnvironment.command = `mv envs outputs/envs && cd outputs && ${amlEnvironment.command}`;
}
amlEnvironment.command = `import os\nos.system('${amlEnvironment.command}')`;
amlEnvironment.useActiveGpu = this.amlClusterConfig.useActiveGpu;
amlEnvironment.maxTrialNumberPerGpu = this.amlClusterConfig.maxTrialNumPerGpu;
await fs.promises.writeFile(path.join(environmentLocalTempFolder, 'nni_script.py'), amlEnvironment.command, { encoding: 'utf8' });
const amlClient = new AMLClient(
this.amlClusterConfig.subscriptionId,
Expand Down
Loading