Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Add sshbarrier to ssh plugin (#3587)
Browse files Browse the repository at this point in the history
* Add ssh barrier
- set sshbarrier to true to enable ssh barrier
- set sshbarriertaskroles to identify specific task roles that barrier wait for ssh setup. If no sshbarriertaskroles, all the barrier will wait for all task roles.
* Add error spec for SSH barrier.
  • Loading branch information
wangdian authored Sep 10, 2019
1 parent 86056e9 commit 91a6d2a
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 6 deletions.
18 changes: 18 additions & 0 deletions src/k8s-job-exit-spec/config/k8s-job-exit-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,24 @@ spec:
# Description: User Container issued failures:
# -> Voluntary failures caused by Container itself
###########################################################################
- code: 240
phrase: PAIRuntimeSSHBarrierTimeout
issuer: PAI_RUNTIME
causer: PAI_RUNTIME
type: PLATFORM_FAILURE
stage: RUNNING
behavior: PERMANENT
reaction: NEVER_RETRY
reason: "SSH barrier reaches max retry count"
repro:
- "SSH barrier reaches max retry count, please check if SSH plugin is correctly configured"
solution:
- "Check job config to confirm all SSH barrier relied task roles enabled SSH"
pattern:
runtimeContainerPatterns:
- exitCode: 10
platformLogRegex: "SSH barrier reaches max retry count"

- code: 250
phrase: FrameworkBarrierTransientFailed
issuer: PAI_RUNTIME
Expand Down
7 changes: 6 additions & 1 deletion src/kube-runtime/src/plugins/ssh/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ extras:
- plugin: ssh
parameters:
jobssh: boolean
sshbarrier: boolean
sshbarriertaskroles:
- taskrole
userssh:
type: string
value: string
```
- jobssh: true to enable job container wise ssh, false to disable.
- userssh: currently the userssh type should be system|custom. Type system means the value is a key stored in PAI, and type custom means the value is the string defined in job config.
- sshbarrier: if set to true, wait until can ssh to all corresponding job containers. If not set, the defalut value is false.
- sshbarriertaskroles: only valid if sshbarrier set to true. Defines the task roles that the barrier will test ssh to. If not defind, all taskroles will be included.
- userssh: currently the userssh type should be ```custom```. Type ```custom``` means use the userssh value as the SSH public key to run job. User can use the corresponding SSH private key to connect to job container.
19 changes: 14 additions & 5 deletions src/kube-runtime/src/plugins/ssh/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,27 @@
if __name__ == "__main__":
[parameters, pre_script, post_script] = plugin_init()

cmdParams = []
if parameters is not None:
if "jobssh" in parameters:
cmdParams.append(str(parameters["jobssh"]).lower())
jobssh = str(parameters["jobssh"]).lower()
else:
cmdParams.append("false")
jobssh = "false"
cmdParams = [jobssh]

if "userssh" in parameters:
if "type" in parameters["userssh"] and "value" in parameters["userssh"]:
cmdParams.append(str(parameters["userssh"]["type"]))
cmdParams.append("\'{}\'".format(parameters["userssh"]["value"]))

# write call to real executable script
command = "{}/sshd.sh {}\n".format(os.path.dirname(os.path.abspath(__file__)), " ".join(cmdParams))
inject_commands([command], pre_script)
command = ["{}/sshd.sh {}\n".format(os.path.dirname(os.path.abspath(__file__)), " ".join(cmdParams))]

# ssh barrier
if jobssh == "true" and "sshbarrier" in parameters and str(parameters["sshbarrier"]).lower() == "true":
if "sshbarriertaskroles" in parameters:
barrierParams = " ".join('"{}"'.format(tr) for tr in parameters["sshbarriertaskroles"])
else:
barrierParams = ""
command.append("{}/sshbarrier.sh {}\n".format(os.path.dirname(os.path.abspath(__file__)), barrierParams))

inject_commands(command, pre_script)
86 changes: 86 additions & 0 deletions src/kube-runtime/src/plugins/ssh/sshbarrier.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/bin/bash
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

# no set -o errexit because use exitcode to judge ssh connectivity
# no set -o nounset because use empty array to judge end
set -o pipefail

readonly MAX_RETRY_COUNT=20
readonly RETRY_INTERVAL=1

function check_ssh_connection()
{
ssh -q -o BatchMode=yes -o StrictHostKeyChecking=no $1 "exit 0"
_RCODE=$?
return $_RCODE
}

taskRolesToCheck=()
for barrierTaskRole in $@; do
taskRolesToCheck+=($barrierTaskRole)
done

instancesToCheck=()
# Set ssh config for all task role instances
taskRoleInstances=(${PAI_TASK_ROLE_INSTANCES//,/ })
for i in "${taskRoleInstances[@]}"; do
instancePair=(${i//:/ })
taskRole=${instancePair[0]}
index=${instancePair[1]}

if [[ $taskRole = $FC_TASKROLE_NAME ]] && [[ $index = $FC_TASK_INDEX ]]; then
continue
fi

# If barrier task roles defined, then only check instances for defined task roles. Otherwise check all instances.
if [[ ${#taskRolesToCheck[@]} != 0 ]]; then
if [[ ${taskRolesToCheck[@]} =~ ${taskRole} ]]; then
instancesToCheck+=("${taskRole}-${index}")
fi
else
instancesToCheck+=("${taskRole}-${index}")
fi
done

retryCount=0
while true
do
echo "Trying to SSH to instances: ${instancesToCheck[*]}"

instanceFailed=()
for instance in "${instancesToCheck[@]}"; do
check_ssh_connection "$instance"
if [[ $? != 0 ]]; then
instanceFailed+=("$instance")
fi
done

[[ ${#instanceFailed[@]} = 0 ]] && break

if (( $retryCount >= $MAX_RETRY_COUNT )); then
echo "SSH barrier reaches max retry count. Failed instances: ${instancesToCheck[*]} Exit..." >&2
exit 10
fi

instancesToCheck=(${instanceFailed[*]})
((retryCount++))

sleep $RETRY_INTERVAL
done

echo "All ssh connections are established, continue..."
4 changes: 4 additions & 0 deletions src/kube-runtime/src/plugins/ssh/sshd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

set -o errexit
set -o nounset
set -o pipefail

PAI_WORK_DIR=/usr/local/pai
SSH_DIR=/root/.ssh

Expand Down
59 changes: 59 additions & 0 deletions src/kube-runtime/test/sshbarrier_test_job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
protocolVersion: 2
name: sshbarrier_test_job
type: job
version: horovod0.16.4-tf1.12.0-torch1.1.0-mxnet1.4.1-py3.5
contributor: OpenPAI
description: |
This is a distributed synthetic benchmark for Horovod with PyTorch backend running on OpenPAI.
It runs [Horovod with Open MPI](https://github.com/horovod/horovod/blob/master/docs/mpirun.rst).
parameters:
model: resnet50
batchsize: 64

prerequisites:
- protocolVersion: 2
name: horovod_official
type: dockerimage
contributor : Horovod
uri : horovod/horovod:0.16.4-tf1.12.0-torch1.1.0-mxnet1.4.1-py3.5

taskRoles:
master:
instances: 1
completion:
minSucceededInstances: 1
dockerImage: horovod_official
resourcePerInstance:
cpu: 8
memoryMB: 16384
gpu: 2
commands:
- sleep 10
- >
horovodrun -np 4 -H master-0:2,worker-0:2
python pytorch_synthetic_benchmark.py
--model <% $parameters.model %>
--batch-size <% $parameters.batchsize %>
worker:
instances: 1
dockerImage: horovod_official
resourcePerInstance:
cpu: 8
memoryMB: 16384
gpu: 2
commands:
- sleep infinity

extras:
com.microsoft.pai.runtimeplugin:
- plugin: ssh
taskroles:
- master
parameters:
jobssh: true
sshbarrier: true
- plugin: ssh
taskroles:
- worker
parameters:
jobssh: true
10 changes: 10 additions & 0 deletions src/kube-runtime/test/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ def test_ssh_plugin(self):
commands = [[],[]]
init_plugins(jobconfig, commands, "../src/plugins", ".", "worker")

def test_ssh_plugin_barrier(self):
job_path = "sshbarrier_test_job.yaml"
if os.path.exists(job_path):
with open(job_path, 'rt') as f:
jobconfig = yaml.load(f)
commands = [[],[]]
init_plugins(jobconfig, commands, "../src/plugins", ".", "master")
commands = [[],[]]
init_plugins(jobconfig, commands, "../src/plugins", ".", "worker")


if __name__ == '__main__':
unittest.main()

0 comments on commit 91a6d2a

Please sign in to comment.