Skip to content

Commit

Permalink
Add SSH Executor (#351)
Browse files Browse the repository at this point in the history
* fix bug in examples

* Add SSH Executor

* Update README

* fix test

* Update README
  • Loading branch information
yottahmd authored Oct 10, 2022
1 parent 82a4cb2 commit c33702f
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 15 deletions.
35 changes: 31 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,17 @@ It runs <a href="https://en.wikipedia.org/wiki/Directed_acyclic_graph">DAGs (Dir
- Schedule executions of DAGs with Cron expressions
- Define dependencies between related jobs and represent them as a single DAG (unit of execution)

## Features
- Web UI to edit DAGs, view past execution logs and history
- Ability to run/stop/retry DAGs from the Web UI or command
- Support for command execution on remote hosts via SSH
- Ability to send mail notifications on error or success of a DAG
- Various options to define DAG specification (e.g. environment variables, parameters, conditional logic, etc)

## Contents

- [Highlights](#highlights)
- [Features](#features)
- [Contents](#contents)
- [Getting started](#getting-started)
- [Motivation](#motivation)
Expand All @@ -42,7 +50,7 @@ It runs <a href="https://en.wikipedia.org/wiki/Directed_acyclic_graph">DAGs (Dir
- [Install `dagu`](#install-dagu)
- [via Homebrew](#via-homebrew)
- [via Bash script](#via-bash-script)
- [via Docker Image](#via-docker-image)
- [via Docker](#via-docker)
- [via GitHub Release Page](#via-github-release-page)
- [️Quick start](#️quick-start)
- [1. Launch the Web UI](#1-launch-the-web-ui)
Expand All @@ -65,6 +73,7 @@ It runs <a href="https://en.wikipedia.org/wiki/Directed_acyclic_graph">DAGs (Dir
- [Other Available Fields](#other-available-fields)
- [Executor](#executor)
- [HTTP Executor](#http-executor)
- [SSH Executor](#ssh-executor)
- [Admin Configuration](#admin-configuration)
- [Environment Variable](#environment-variable)
- [Sending email notifications](#sending-email-notifications)
Expand Down Expand Up @@ -125,7 +134,7 @@ brew upgrade yohamta/tap/dagu
curl -L https://raw.githubusercontent.com/yohamta/dagu/main/scripts/downloader.sh | bash
```

### via Docker Image
### via Docker

```sh
docker run \
Expand Down Expand Up @@ -450,7 +459,7 @@ Executor is a different way of executing a Step; Executor can be set in the `exe

### HTTP Executor

The HTTP Executor allows you to send arbitrary HTTP requests.
The HTTP Executor allows us to send arbitrary HTTP requests.

```yaml
steps:
Expand All @@ -470,6 +479,24 @@ steps:
}
```

### SSH Executor

The SSH Executor allows us to execute arbitrary command on a remote host.

```yaml
steps:
- name: step1
executor:
type: ssh
config:
user: dagu
ip: XXX.XXX.XXX.XXX
port: 22
key: /Users/dagu/.ssh/private.pem
StrictHostKeyChecking: false
command: /usr/sbin/ifconfig
```

## Admin Configuration

To configure dagu, please create the config file (default path: `~/.dagu/admin.yaml`). All fields are optional.
Expand Down Expand Up @@ -505,7 +532,7 @@ You can configure the dagu's internal work directory by defining `DAGU_HOME` env

## Sending email notifications

Email notifications can be sent when a DAG finished with an error or successfully. To do so, you can set the `smtp` field and related fields in the DAG specs. You can use any email delivery services (e.g., Sendgrid, Mailgun, etc).
Email notifications can be sent when a DAG finished with an error or successfully. To do so, you can set the `smtp` field and related fields in the DAG specs. You can use any email delivery services (e.g. Sendgrid, Mailgun, etc).

```yaml
# Eamil notification settings
Expand Down
7 changes: 4 additions & 3 deletions examples/docker-container.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
steps:
- name: step1
executor:
- type: docker
hostname: localhost
image: node:latest
type: docker
config:
host: localhost
image: node:latest
command: npm init -y
10 changes: 10 additions & 0 deletions examples/ssh-command.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
steps:
- name: step1
executor:
type: ssh
config:
user: ec2-user
ip: "35.78.67.103"
key: /Users/hamadayouta/.ssh/prod-ec2instance-keypair.pem
StrictHostKeyChecking: false
command: /usr/sbin/ifconfig
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/rivo/uniseg v0.3.4 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/samber/lo v1.27.0
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b
golang.org/x/net v0.0.0-20220812174116-3211cb980234 // indirect
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/urfave/cli/v2 v2.4.5 h1:AWCiaqBc+38MxX6nJfjRQyyd2Gq50sOan+AEyv/vFhM=
github.com/urfave/cli/v2 v2.4.5/go.mod h1:oDzoM7pVwz6wHn5ogWgFUU1s4VJayeQS+aEZDqXIEJs=
github.com/yohamta/grep v1.0.0 h1:gCz7u8+caSqLNnY7LehatRnMBMTKOd4iiLClznHNcJI=
github.com/yohamta/grep v1.0.0/go.mod h1:WEl5AeArgNwJmGvsEHr0WC4pqm0YWaWz3UId0V5D0hs=
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b h1:huxqepDufQpLLIRXiVkTvnxrzJlpwmIWAObmcCcUFr0=
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
Expand Down
23 changes: 17 additions & 6 deletions internal/dag/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,21 +375,32 @@ func (b *builder) buildStep(variables []string, def *stepDef) (*Step, error) {
case map[interface{}]interface{}:
for k, v := range val {
if k, ok := k.(string); ok {
if v, ok := v.(string); ok {
if k == "type" {
switch k {
case "type":
if v, ok := v.(string); ok {
step.ExecutorConfig.Type = v
} else {
step.ExecutorConfig.Config[k] = v
return nil, fmt.Errorf("invalid value for type: %s", v)
}
case "config":
if v, ok := v.(map[interface{}]interface{}); ok {
for k, v := range v {
if k, ok := k.(string); ok {
step.ExecutorConfig.Config[k] = v
} else {
return nil, fmt.Errorf("invalid key for config: %s", k)
}
}
} else {
return nil, fmt.Errorf("invalid value for config: %s", v)
}
} else {
return nil, fmt.Errorf("invalid value for executor %s", v)
}
} else {
return nil, fmt.Errorf("invalid executor config key %s", k)
}
}
default:
return nil, fmt.Errorf("invalid executor config")
return nil, fmt.Errorf("invalid executor config type: %t values: %v", val, val)
}
}
// TODO: validate executor config
Expand Down
5 changes: 3 additions & 2 deletions internal/dag/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,12 @@ steps:
command: echo 1
executor:
type: http
config: some option
config:
key: value
`,
expectedExec: "http",
expectedConfig: map[string]interface{}{
"config": "some option",
"key": "value",
},
},
}
Expand Down
138 changes: 138 additions & 0 deletions internal/executor/ssh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package executor

import (
"context"
"fmt"
"io"
"net"
"os"
"strings"

"github.com/mitchellh/mapstructure"
"github.com/yohamta/dagu/internal/dag"
"golang.org/x/crypto/ssh"
)

type SSHConfig struct {
User string
IP string
Port int
Key string
StrictHostKeyChecking bool
}

type SSHExecutor struct {
step *dag.Step
config *SSHConfig
sshConfig *ssh.ClientConfig
stdout io.Writer
addr net.Addr
session *ssh.Session
}

func (e *SSHExecutor) SetStdout(out io.Writer) {
e.stdout = out
}

func (e *SSHExecutor) SetStderr(out io.Writer) {
e.stdout = out
}

func (e *SSHExecutor) Kill(sig os.Signal) error {
if e.session != nil {
return e.session.Close()
}
return nil
}

func (e *SSHExecutor) Run() error {
addr := fmt.Sprintf("%s:%d", e.config.IP, e.config.Port)
conn, err := ssh.Dial("tcp", addr, e.sshConfig)
if err != nil {
return err
}

session, err := conn.NewSession()
if err != nil {
return err
}
e.session = session
defer session.Close()

// Once a Session is created, you can execute a single command on
// the remote side using the Run method.
session.Stdout = e.stdout
session.Stderr = e.stdout
command := strings.Join(append([]string{e.step.Command}, e.step.Args...), " ")
return session.Run(command)
}

func CreateSSHExecutor(ctx context.Context, step *dag.Step) (Executor, error) {
cfg := &SSHConfig{}
md, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{Result: cfg})

if err != nil {
return nil, err
}

if err := md.Decode(step.ExecutorConfig.Config); err != nil {
return nil, err
}

if cfg.Port == 0 {
cfg.Port = 22
}

if cfg.StrictHostKeyChecking {
return nil, fmt.Errorf("StrictHostKeyChecking is not supported yet")
}

// Create the Signer for this private key.
signer, err := getPublicKeySigner(cfg.Key)
if err != nil {
return nil, err
}

sshConfig := &ssh.ClientConfig{
User: cfg.User,
Auth: []ssh.AuthMethod{
// Use the PublicKeys method for remote authentication.
ssh.PublicKeys(signer),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}

return &SSHExecutor{
step: step,
config: cfg,
sshConfig: sshConfig,
stdout: os.Stdout,
}, nil
}

// referenced code:
// https://go.googlesource.com/crypto/+/master/ssh/example_test.go
// https://gist.github.com/boyzhujian/73b5ecd37efd6f8dd38f56e7588f1b58
func getPublicKeySigner(path string) (ssh.Signer, error) {
// A public key may be used to authenticate against the remote
// server by using an unencrypted PEM-encoded private key file.
//
// If you have an encrypted private key, the crypto/x509 package
// can be used to decrypt it.
key, err := os.ReadFile(path)
if err != nil {
return nil, err
}

// Create the Signer for this private key.
signer, err := ssh.ParsePrivateKey(key)
if err != nil {
return nil, err
}

return signer, nil
}

func init() {
Register("ssh", CreateSSHExecutor)
}

0 comments on commit c33702f

Please sign in to comment.