From c33702f8bbe8bdf4f8d4da0d10cbe9001d9c4679 Mon Sep 17 00:00:00 2001 From: Yota Hamada Date: Mon, 10 Oct 2022 20:05:43 +0900 Subject: [PATCH] Add SSH Executor (#351) * fix bug in examples * Add SSH Executor * Update README * fix test * Update README --- README.md | 35 ++++++++- examples/docker-container.yaml | 7 +- examples/ssh-command.yaml | 10 +++ go.mod | 1 + go.sum | 2 + internal/dag/builder.go | 23 ++++-- internal/dag/builder_test.go | 5 +- internal/executor/ssh.go | 138 +++++++++++++++++++++++++++++++++ 8 files changed, 206 insertions(+), 15 deletions(-) create mode 100644 examples/ssh-command.yaml create mode 100644 internal/executor/ssh.go diff --git a/README.md b/README.md index 2765cb008..be95b1bd6 100644 --- a/README.md +++ b/README.md @@ -31,9 +31,17 @@ It runs DAGs (Dir - Schedule executions of DAGs with Cron expressions - Define dependencies between related jobs and represent them as a single DAG (unit of execution) +## Features +- Web UI to edit DAGs, view past execution logs and history +- Ability to run/stop/retry DAGs from the Web UI or command +- Support for command execution on remote hosts via SSH +- Ability to send mail notifications on error or success of a DAG +- Various options to define DAG specification (e.g. environment variables, parameters, conditional logic, etc) + ## Contents - [Highlights](#highlights) +- [Features](#features) - [Contents](#contents) - [Getting started](#getting-started) - [Motivation](#motivation) @@ -42,7 +50,7 @@ It runs DAGs (Dir - [Install `dagu`](#install-dagu) - [via Homebrew](#via-homebrew) - [via Bash script](#via-bash-script) - - [via Docker Image](#via-docker-image) + - [via Docker](#via-docker) - [via GitHub Release Page](#via-github-release-page) - [️Quick start](#️quick-start) - [1. Launch the Web UI](#1-launch-the-web-ui) @@ -65,6 +73,7 @@ It runs DAGs (Dir - [Other Available Fields](#other-available-fields) - [Executor](#executor) - [HTTP Executor](#http-executor) + - [SSH Executor](#ssh-executor) - [Admin Configuration](#admin-configuration) - [Environment Variable](#environment-variable) - [Sending email notifications](#sending-email-notifications) @@ -125,7 +134,7 @@ brew upgrade yohamta/tap/dagu curl -L https://raw.githubusercontent.com/yohamta/dagu/main/scripts/downloader.sh | bash ``` -### via Docker Image +### via Docker ```sh docker run \ @@ -450,7 +459,7 @@ Executor is a different way of executing a Step; Executor can be set in the `exe ### HTTP Executor -The HTTP Executor allows you to send arbitrary HTTP requests. +The HTTP Executor allows us to send arbitrary HTTP requests. ```yaml steps: @@ -470,6 +479,24 @@ steps: } ``` +### SSH Executor + +The SSH Executor allows us to execute arbitrary command on a remote host. + +```yaml +steps: + - name: step1 + executor: + type: ssh + config: + user: dagu + ip: XXX.XXX.XXX.XXX + port: 22 + key: /Users/dagu/.ssh/private.pem + StrictHostKeyChecking: false + command: /usr/sbin/ifconfig +``` + ## Admin Configuration To configure dagu, please create the config file (default path: `~/.dagu/admin.yaml`). All fields are optional. @@ -505,7 +532,7 @@ You can configure the dagu's internal work directory by defining `DAGU_HOME` env ## Sending email notifications -Email notifications can be sent when a DAG finished with an error or successfully. To do so, you can set the `smtp` field and related fields in the DAG specs. You can use any email delivery services (e.g., Sendgrid, Mailgun, etc). +Email notifications can be sent when a DAG finished with an error or successfully. To do so, you can set the `smtp` field and related fields in the DAG specs. You can use any email delivery services (e.g. Sendgrid, Mailgun, etc). ```yaml # Eamil notification settings diff --git a/examples/docker-container.yaml b/examples/docker-container.yaml index 97e8b6718..196b1bab4 100644 --- a/examples/docker-container.yaml +++ b/examples/docker-container.yaml @@ -1,7 +1,8 @@ steps: - name: step1 executor: - - type: docker - hostname: localhost - image: node:latest + type: docker + config: + host: localhost + image: node:latest command: npm init -y diff --git a/examples/ssh-command.yaml b/examples/ssh-command.yaml new file mode 100644 index 000000000..a220b465a --- /dev/null +++ b/examples/ssh-command.yaml @@ -0,0 +1,10 @@ +steps: + - name: step1 + executor: + type: ssh + config: + user: ec2-user + ip: "35.78.67.103" + key: /Users/hamadayouta/.ssh/prod-ec2instance-keypair.pem + StrictHostKeyChecking: false + command: /usr/sbin/ifconfig diff --git a/go.mod b/go.mod index 472d8f49a..8bfe20b14 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/rivo/uniseg v0.3.4 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/samber/lo v1.27.0 + golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b golang.org/x/net v0.0.0-20220812174116-3211cb980234 // indirect golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index f0fc8bacd..859998896 100644 --- a/go.sum +++ b/go.sum @@ -51,6 +51,8 @@ github.com/urfave/cli/v2 v2.4.5 h1:AWCiaqBc+38MxX6nJfjRQyyd2Gq50sOan+AEyv/vFhM= github.com/urfave/cli/v2 v2.4.5/go.mod h1:oDzoM7pVwz6wHn5ogWgFUU1s4VJayeQS+aEZDqXIEJs= github.com/yohamta/grep v1.0.0 h1:gCz7u8+caSqLNnY7LehatRnMBMTKOd4iiLClznHNcJI= github.com/yohamta/grep v1.0.0/go.mod h1:WEl5AeArgNwJmGvsEHr0WC4pqm0YWaWz3UId0V5D0hs= +golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b h1:huxqepDufQpLLIRXiVkTvnxrzJlpwmIWAObmcCcUFr0= +golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM= golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= diff --git a/internal/dag/builder.go b/internal/dag/builder.go index 7b4cb9e9e..fd3797afc 100644 --- a/internal/dag/builder.go +++ b/internal/dag/builder.go @@ -375,21 +375,32 @@ func (b *builder) buildStep(variables []string, def *stepDef) (*Step, error) { case map[interface{}]interface{}: for k, v := range val { if k, ok := k.(string); ok { - if v, ok := v.(string); ok { - if k == "type" { + switch k { + case "type": + if v, ok := v.(string); ok { step.ExecutorConfig.Type = v } else { - step.ExecutorConfig.Config[k] = v + return nil, fmt.Errorf("invalid value for type: %s", v) + } + case "config": + if v, ok := v.(map[interface{}]interface{}); ok { + for k, v := range v { + if k, ok := k.(string); ok { + step.ExecutorConfig.Config[k] = v + } else { + return nil, fmt.Errorf("invalid key for config: %s", k) + } + } + } else { + return nil, fmt.Errorf("invalid value for config: %s", v) } - } else { - return nil, fmt.Errorf("invalid value for executor %s", v) } } else { return nil, fmt.Errorf("invalid executor config key %s", k) } } default: - return nil, fmt.Errorf("invalid executor config") + return nil, fmt.Errorf("invalid executor config type: %t values: %v", val, val) } } // TODO: validate executor config diff --git a/internal/dag/builder_test.go b/internal/dag/builder_test.go index a75f8c73a..e8999ce6d 100644 --- a/internal/dag/builder_test.go +++ b/internal/dag/builder_test.go @@ -388,11 +388,12 @@ steps: command: echo 1 executor: type: http - config: some option + config: + key: value `, expectedExec: "http", expectedConfig: map[string]interface{}{ - "config": "some option", + "key": "value", }, }, } diff --git a/internal/executor/ssh.go b/internal/executor/ssh.go new file mode 100644 index 000000000..4b1f00139 --- /dev/null +++ b/internal/executor/ssh.go @@ -0,0 +1,138 @@ +package executor + +import ( + "context" + "fmt" + "io" + "net" + "os" + "strings" + + "github.com/mitchellh/mapstructure" + "github.com/yohamta/dagu/internal/dag" + "golang.org/x/crypto/ssh" +) + +type SSHConfig struct { + User string + IP string + Port int + Key string + StrictHostKeyChecking bool +} + +type SSHExecutor struct { + step *dag.Step + config *SSHConfig + sshConfig *ssh.ClientConfig + stdout io.Writer + addr net.Addr + session *ssh.Session +} + +func (e *SSHExecutor) SetStdout(out io.Writer) { + e.stdout = out +} + +func (e *SSHExecutor) SetStderr(out io.Writer) { + e.stdout = out +} + +func (e *SSHExecutor) Kill(sig os.Signal) error { + if e.session != nil { + return e.session.Close() + } + return nil +} + +func (e *SSHExecutor) Run() error { + addr := fmt.Sprintf("%s:%d", e.config.IP, e.config.Port) + conn, err := ssh.Dial("tcp", addr, e.sshConfig) + if err != nil { + return err + } + + session, err := conn.NewSession() + if err != nil { + return err + } + e.session = session + defer session.Close() + + // Once a Session is created, you can execute a single command on + // the remote side using the Run method. + session.Stdout = e.stdout + session.Stderr = e.stdout + command := strings.Join(append([]string{e.step.Command}, e.step.Args...), " ") + return session.Run(command) +} + +func CreateSSHExecutor(ctx context.Context, step *dag.Step) (Executor, error) { + cfg := &SSHConfig{} + md, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{Result: cfg}) + + if err != nil { + return nil, err + } + + if err := md.Decode(step.ExecutorConfig.Config); err != nil { + return nil, err + } + + if cfg.Port == 0 { + cfg.Port = 22 + } + + if cfg.StrictHostKeyChecking { + return nil, fmt.Errorf("StrictHostKeyChecking is not supported yet") + } + + // Create the Signer for this private key. + signer, err := getPublicKeySigner(cfg.Key) + if err != nil { + return nil, err + } + + sshConfig := &ssh.ClientConfig{ + User: cfg.User, + Auth: []ssh.AuthMethod{ + // Use the PublicKeys method for remote authentication. + ssh.PublicKeys(signer), + }, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + } + + return &SSHExecutor{ + step: step, + config: cfg, + sshConfig: sshConfig, + stdout: os.Stdout, + }, nil +} + +// referenced code: +// https://go.googlesource.com/crypto/+/master/ssh/example_test.go +// https://gist.github.com/boyzhujian/73b5ecd37efd6f8dd38f56e7588f1b58 +func getPublicKeySigner(path string) (ssh.Signer, error) { + // A public key may be used to authenticate against the remote + // server by using an unencrypted PEM-encoded private key file. + // + // If you have an encrypted private key, the crypto/x509 package + // can be used to decrypt it. + key, err := os.ReadFile(path) + if err != nil { + return nil, err + } + + // Create the Signer for this private key. + signer, err := ssh.ParsePrivateKey(key) + if err != nil { + return nil, err + } + + return signer, nil +} + +func init() { + Register("ssh", CreateSSHExecutor) +}