Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Add sshbarrier to ssh plugin #3587

Merged
merged 9 commits into from
Sep 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/k8s-job-exit-spec/config/k8s-job-exit-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,24 @@ spec:
# Description: User Container issued failures:
# -> Voluntary failures caused by Container itself
###########################################################################
- code: 240
phrase: PAIRuntimeSSHBarrierTimeout
issuer: PAI_RUNTIME
causer: PAI_RUNTIME
type: PLATFORM_FAILURE
stage: RUNNING
behavior: PERMANENT
reaction: NEVER_RETRY
reason: "SSH barrier reaches max retry count"
repro:
- "SSH barrier reaches max retry count, please check if SSH plugin is correctly configured"
solution:
- "Check job config to confirm all SSH barrier relied task roles enabled SSH"
pattern:
runtimeContainerPatterns:
- exitCode: 10
platformLogRegex: "SSH barrier reaches max retry count"

- code: 250
phrase: FrameworkBarrierTransientFailed
issuer: PAI_RUNTIME
Expand Down
7 changes: 6 additions & 1 deletion src/kube-runtime/src/plugins/ssh/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ extras:
- plugin: ssh
parameters:
jobssh: boolean
sshbarrier: boolean
sshbarriertaskroles:
- taskrole
userssh:
type: string
value: string
```
- jobssh: true to enable job container wise ssh, false to disable.
- userssh: currently the userssh type should be system|custom. Type system means the value is a key stored in PAI, and type custom means the value is the string defined in job config.
- sshbarrier: if set to true, wait until can ssh to all corresponding job containers. If not set, the defalut value is false.
- sshbarriertaskroles: only valid if sshbarrier set to true. Defines the task roles that the barrier will test ssh to. If not defind, all taskroles will be included.
- userssh: currently the userssh type should be ```custom```. Type ```custom``` means use the userssh value as the SSH public key to run job. User can use the corresponding SSH private key to connect to job container.
19 changes: 14 additions & 5 deletions src/kube-runtime/src/plugins/ssh/init.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,27 @@
if __name__ == "__main__":
[parameters, pre_script, post_script] = plugin_init()

cmdParams = []
if parameters is not None:
if "jobssh" in parameters:
cmdParams.append(str(parameters["jobssh"]).lower())
jobssh = str(parameters["jobssh"]).lower()
else:
cmdParams.append("false")
jobssh = "false"
cmdParams = [jobssh]

if "userssh" in parameters:
if "type" in parameters["userssh"] and "value" in parameters["userssh"]:
cmdParams.append(str(parameters["userssh"]["type"]))
cmdParams.append("\'{}\'".format(parameters["userssh"]["value"]))

# write call to real executable script
command = "{}/sshd.sh {}\n".format(os.path.dirname(os.path.abspath(__file__)), " ".join(cmdParams))
inject_commands([command], pre_script)
command = ["{}/sshd.sh {}\n".format(os.path.dirname(os.path.abspath(__file__)), " ".join(cmdParams))]

# ssh barrier
if jobssh == "true" and "sshbarrier" in parameters and str(parameters["sshbarrier"]).lower() == "true":
if "sshbarriertaskroles" in parameters:
barrierParams = " ".join('"{}"'.format(tr) for tr in parameters["sshbarriertaskroles"])
else:
barrierParams = ""
command.append("{}/sshbarrier.sh {}\n".format(os.path.dirname(os.path.abspath(__file__)), barrierParams))

inject_commands(command, pre_script)
86 changes: 86 additions & 0 deletions src/kube-runtime/src/plugins/ssh/sshbarrier.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
#!/bin/bash
# Copyright (c) Microsoft Corporation
# All rights reserved.
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the "Software"), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
# to permit persons to whom the Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED *AS IS*, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
# BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

# no set -o errexit because use exitcode to judge ssh connectivity
# no set -o nounset because use empty array to judge end
set -o pipefail

readonly MAX_RETRY_COUNT=20
readonly RETRY_INTERVAL=1

function check_ssh_connection()
{
ssh -q -o BatchMode=yes -o StrictHostKeyChecking=no $1 "exit 0"
_RCODE=$?
return $_RCODE
}

taskRolesToCheck=()
for barrierTaskRole in $@; do
taskRolesToCheck+=($barrierTaskRole)
done

instancesToCheck=()
# Set ssh config for all task role instances
taskRoleInstances=(${PAI_TASK_ROLE_INSTANCES//,/ })
for i in "${taskRoleInstances[@]}"; do
instancePair=(${i//:/ })
taskRole=${instancePair[0]}
index=${instancePair[1]}

if [[ $taskRole = $FC_TASKROLE_NAME ]] && [[ $index = $FC_TASK_INDEX ]]; then
continue
fi

# If barrier task roles defined, then only check instances for defined task roles. Otherwise check all instances.
if [[ ${#taskRolesToCheck[@]} != 0 ]]; then
if [[ ${taskRolesToCheck[@]} =~ ${taskRole} ]]; then
instancesToCheck+=("${taskRole}-${index}")
fi
else
instancesToCheck+=("${taskRole}-${index}")
fi
done

retryCount=0
while true
do
echo "Trying to SSH to instances: ${instancesToCheck[*]}"

instanceFailed=()
for instance in "${instancesToCheck[@]}"; do
check_ssh_connection "$instance"
if [[ $? != 0 ]]; then
instanceFailed+=("$instance")
fi
done

[[ ${#instanceFailed[@]} = 0 ]] && break

if (( $retryCount >= $MAX_RETRY_COUNT )); then
echo "SSH barrier reaches max retry count. Failed instances: ${instancesToCheck[*]} Exit..." >&2
exit 10
fi

instancesToCheck=(${instanceFailed[*]})
((retryCount++))

sleep $RETRY_INTERVAL
done

echo "All ssh connections are established, continue..."
4 changes: 4 additions & 0 deletions src/kube-runtime/src/plugins/ssh/sshd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

set -o errexit
set -o nounset
set -o pipefail

PAI_WORK_DIR=/usr/local/pai
SSH_DIR=/root/.ssh

Expand Down
59 changes: 59 additions & 0 deletions src/kube-runtime/test/sshbarrier_test_job.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
protocolVersion: 2
name: sshbarrier_test_job
type: job
version: horovod0.16.4-tf1.12.0-torch1.1.0-mxnet1.4.1-py3.5
contributor: OpenPAI
description: |
This is a distributed synthetic benchmark for Horovod with PyTorch backend running on OpenPAI.
It runs [Horovod with Open MPI](https://github.com/horovod/horovod/blob/master/docs/mpirun.rst).
parameters:
model: resnet50
batchsize: 64

prerequisites:
- protocolVersion: 2
name: horovod_official
type: dockerimage
contributor : Horovod
uri : horovod/horovod:0.16.4-tf1.12.0-torch1.1.0-mxnet1.4.1-py3.5

taskRoles:
master:
instances: 1
completion:
minSucceededInstances: 1
dockerImage: horovod_official
resourcePerInstance:
cpu: 8
memoryMB: 16384
gpu: 2
commands:
- sleep 10
- >
horovodrun -np 4 -H master-0:2,worker-0:2
python pytorch_synthetic_benchmark.py
--model <% $parameters.model %>
--batch-size <% $parameters.batchsize %>
worker:
instances: 1
dockerImage: horovod_official
resourcePerInstance:
cpu: 8
memoryMB: 16384
gpu: 2
commands:
- sleep infinity

extras:
com.microsoft.pai.runtimeplugin:
- plugin: ssh
taskroles:
- master
parameters:
jobssh: true
sshbarrier: true
- plugin: ssh
taskroles:
- worker
parameters:
jobssh: true
10 changes: 10 additions & 0 deletions src/kube-runtime/test/test_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ def test_ssh_plugin(self):
commands = [[],[]]
init_plugins(jobconfig, commands, "../src/plugins", ".", "worker")

def test_ssh_plugin_barrier(self):
job_path = "sshbarrier_test_job.yaml"
if os.path.exists(job_path):
with open(job_path, 'rt') as f:
jobconfig = yaml.load(f)
commands = [[],[]]
init_plugins(jobconfig, commands, "../src/plugins", ".", "master")
commands = [[],[]]
init_plugins(jobconfig, commands, "../src/plugins", ".", "worker")


if __name__ == '__main__':
unittest.main()