Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SSH Executor #351

Merged
merged 5 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 31 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,17 @@ It runs <a href="https://en.wikipedia.org/wiki/Directed_acyclic_graph">DAGs (Dir
- Schedule executions of DAGs with Cron expressions
- Define dependencies between related jobs and represent them as a single DAG (unit of execution)

## Features
- Web UI to edit DAGs, view past execution logs and history
- Ability to run/stop/retry DAGs from the Web UI or command
- Support for command execution on remote hosts via SSH
- Ability to send mail notifications on error or success of a DAG
- Various options to define DAG specification (e.g. environment variables, parameters, conditional logic, etc)

## Contents

- [Highlights](#highlights)
- [Features](#features)
- [Contents](#contents)
- [Getting started](#getting-started)
- [Motivation](#motivation)
Expand All @@ -42,7 +50,7 @@ It runs <a href="https://en.wikipedia.org/wiki/Directed_acyclic_graph">DAGs (Dir
- [Install `dagu`](#install-dagu)
- [via Homebrew](#via-homebrew)
- [via Bash script](#via-bash-script)
- [via Docker Image](#via-docker-image)
- [via Docker](#via-docker)
- [via GitHub Release Page](#via-github-release-page)
- [️Quick start](#️quick-start)
- [1. Launch the Web UI](#1-launch-the-web-ui)
Expand All @@ -65,6 +73,7 @@ It runs <a href="https://en.wikipedia.org/wiki/Directed_acyclic_graph">DAGs (Dir
- [Other Available Fields](#other-available-fields)
- [Executor](#executor)
- [HTTP Executor](#http-executor)
- [SSH Executor](#ssh-executor)
- [Admin Configuration](#admin-configuration)
- [Environment Variable](#environment-variable)
- [Sending email notifications](#sending-email-notifications)
Expand Down Expand Up @@ -125,7 +134,7 @@ brew upgrade yohamta/tap/dagu
curl -L https://raw.githubusercontent.com/yohamta/dagu/main/scripts/downloader.sh | bash
```

### via Docker Image
### via Docker

```sh
docker run \
Expand Down Expand Up @@ -450,7 +459,7 @@ Executor is a different way of executing a Step; Executor can be set in the `exe

### HTTP Executor

The HTTP Executor allows you to send arbitrary HTTP requests.
The HTTP Executor allows us to send arbitrary HTTP requests.

```yaml
steps:
Expand All @@ -470,6 +479,24 @@ steps:
}
```

### SSH Executor

The SSH Executor allows us to execute arbitrary command on a remote host.

```yaml
steps:
- name: step1
executor:
type: ssh
config:
user: dagu
ip: XXX.XXX.XXX.XXX
port: 22
key: /Users/dagu/.ssh/private.pem
StrictHostKeyChecking: false
command: /usr/sbin/ifconfig
```

## Admin Configuration

To configure dagu, please create the config file (default path: `~/.dagu/admin.yaml`). All fields are optional.
Expand Down Expand Up @@ -505,7 +532,7 @@ You can configure the dagu's internal work directory by defining `DAGU_HOME` env

## Sending email notifications

Email notifications can be sent when a DAG finished with an error or successfully. To do so, you can set the `smtp` field and related fields in the DAG specs. You can use any email delivery services (e.g., Sendgrid, Mailgun, etc).
Email notifications can be sent when a DAG finished with an error or successfully. To do so, you can set the `smtp` field and related fields in the DAG specs. You can use any email delivery services (e.g. Sendgrid, Mailgun, etc).

```yaml
# Eamil notification settings
Expand Down
7 changes: 4 additions & 3 deletions examples/docker-container.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
steps:
- name: step1
executor:
- type: docker
hostname: localhost
image: node:latest
type: docker
config:
host: localhost
image: node:latest
command: npm init -y
10 changes: 10 additions & 0 deletions examples/ssh-command.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
steps:
- name: step1
executor:
type: ssh
config:
user: ec2-user
ip: "35.78.67.103"
key: /Users/hamadayouta/.ssh/prod-ec2instance-keypair.pem
StrictHostKeyChecking: false
command: /usr/sbin/ifconfig
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ require (
github.com/rivo/uniseg v0.3.4 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/samber/lo v1.27.0
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b
golang.org/x/net v0.0.0-20220812174116-3211cb980234 // indirect
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/urfave/cli/v2 v2.4.5 h1:AWCiaqBc+38MxX6nJfjRQyyd2Gq50sOan+AEyv/vFhM=
github.com/urfave/cli/v2 v2.4.5/go.mod h1:oDzoM7pVwz6wHn5ogWgFUU1s4VJayeQS+aEZDqXIEJs=
github.com/yohamta/grep v1.0.0 h1:gCz7u8+caSqLNnY7LehatRnMBMTKOd4iiLClznHNcJI=
github.com/yohamta/grep v1.0.0/go.mod h1:WEl5AeArgNwJmGvsEHr0WC4pqm0YWaWz3UId0V5D0hs=
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b h1:huxqepDufQpLLIRXiVkTvnxrzJlpwmIWAObmcCcUFr0=
golang.org/x/crypto v0.0.0-20221005025214-4161e89ecf1b/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 h1:3MTrJm4PyNL9NBqvYDSj3DHl46qQakyfqfWo4jgfaEM=
golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE=
golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
Expand Down
23 changes: 17 additions & 6 deletions internal/dag/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,21 +375,32 @@ func (b *builder) buildStep(variables []string, def *stepDef) (*Step, error) {
case map[interface{}]interface{}:
for k, v := range val {
if k, ok := k.(string); ok {
if v, ok := v.(string); ok {
if k == "type" {
switch k {
case "type":
if v, ok := v.(string); ok {
step.ExecutorConfig.Type = v
} else {
step.ExecutorConfig.Config[k] = v
return nil, fmt.Errorf("invalid value for type: %s", v)
}
case "config":
if v, ok := v.(map[interface{}]interface{}); ok {
for k, v := range v {
if k, ok := k.(string); ok {
step.ExecutorConfig.Config[k] = v
} else {
return nil, fmt.Errorf("invalid key for config: %s", k)
}
}
} else {
return nil, fmt.Errorf("invalid value for config: %s", v)
}
} else {
return nil, fmt.Errorf("invalid value for executor %s", v)
}
} else {
return nil, fmt.Errorf("invalid executor config key %s", k)
}
}
default:
return nil, fmt.Errorf("invalid executor config")
return nil, fmt.Errorf("invalid executor config type: %t values: %v", val, val)
}
}
// TODO: validate executor config
Expand Down
5 changes: 3 additions & 2 deletions internal/dag/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,12 @@ steps:
command: echo 1
executor:
type: http
config: some option
config:
key: value
`,
expectedExec: "http",
expectedConfig: map[string]interface{}{
"config": "some option",
"key": "value",
},
},
}
Expand Down
138 changes: 138 additions & 0 deletions internal/executor/ssh.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package executor

import (
"context"
"fmt"
"io"
"net"
"os"
"strings"

"github.com/mitchellh/mapstructure"
"github.com/yohamta/dagu/internal/dag"
"golang.org/x/crypto/ssh"
)

type SSHConfig struct {
User string
IP string
Port int
Key string
StrictHostKeyChecking bool
}

type SSHExecutor struct {
step *dag.Step
config *SSHConfig
sshConfig *ssh.ClientConfig
stdout io.Writer
addr net.Addr
session *ssh.Session
}

func (e *SSHExecutor) SetStdout(out io.Writer) {
e.stdout = out
}

func (e *SSHExecutor) SetStderr(out io.Writer) {
e.stdout = out
}

func (e *SSHExecutor) Kill(sig os.Signal) error {
if e.session != nil {
return e.session.Close()
}
return nil
}

func (e *SSHExecutor) Run() error {
addr := fmt.Sprintf("%s:%d", e.config.IP, e.config.Port)
conn, err := ssh.Dial("tcp", addr, e.sshConfig)
if err != nil {
return err
}

session, err := conn.NewSession()
if err != nil {
return err
}
e.session = session
defer session.Close()

// Once a Session is created, you can execute a single command on
// the remote side using the Run method.
session.Stdout = e.stdout
session.Stderr = e.stdout
command := strings.Join(append([]string{e.step.Command}, e.step.Args...), " ")
return session.Run(command)
}

func CreateSSHExecutor(ctx context.Context, step *dag.Step) (Executor, error) {
cfg := &SSHConfig{}
md, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{Result: cfg})

if err != nil {
return nil, err
}

if err := md.Decode(step.ExecutorConfig.Config); err != nil {
return nil, err
}

if cfg.Port == 0 {
cfg.Port = 22
}

if cfg.StrictHostKeyChecking {
return nil, fmt.Errorf("StrictHostKeyChecking is not supported yet")
}

// Create the Signer for this private key.
signer, err := getPublicKeySigner(cfg.Key)
if err != nil {
return nil, err
}

sshConfig := &ssh.ClientConfig{
User: cfg.User,
Auth: []ssh.AuthMethod{
// Use the PublicKeys method for remote authentication.
ssh.PublicKeys(signer),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}

return &SSHExecutor{
step: step,
config: cfg,
sshConfig: sshConfig,
stdout: os.Stdout,
}, nil
}

// referenced code:
// https://go.googlesource.com/crypto/+/master/ssh/example_test.go
// https://gist.github.com/boyzhujian/73b5ecd37efd6f8dd38f56e7588f1b58
func getPublicKeySigner(path string) (ssh.Signer, error) {
// A public key may be used to authenticate against the remote
// server by using an unencrypted PEM-encoded private key file.
//
// If you have an encrypted private key, the crypto/x509 package
// can be used to decrypt it.
key, err := os.ReadFile(path)
if err != nil {
return nil, err
}

// Create the Signer for this private key.
signer, err := ssh.ParsePrivateKey(key)
if err != nil {
return nil, err
}

return signer, nil
}

func init() {
Register("ssh", CreateSSHExecutor)
}