Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backup & Restore #2053

Merged
merged 8 commits into from
Mar 25, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions cmd/influxd/backup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package main

import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"

"github.com/influxdb/influxdb"
)

// BackupSuffix is a suffix added to the backup while it's in-process.
const BackupSuffix = ".pending"

// BackupCommand represents the program execution for "influxd backup".
type BackupCommand struct {
// The logger passed to the ticker during execution.
Logger *log.Logger

// Standard input/output, overridden for testing.
Stderr io.Writer
}

// NewBackupCommand returns a new instance of BackupCommand with default settings.
func NewBackupCommand() *BackupCommand {
return &BackupCommand{
Stderr: os.Stderr,
}
}

// Run excutes the program.
func (cmd *BackupCommand) Run(args ...string) error {
// Set up logger.
cmd.Logger = log.New(cmd.Stderr, "", log.LstdFlags)
cmd.Logger.Printf("influxdb backup, version %s, commit %s", version, commit)

// Parse command line arguments.
u, path, err := cmd.parseFlags(args)
if err != nil {
return err
}

// Retrieve snapshot from local file.
ss, err := influxdb.ReadFileSnapshot(path)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("read file snapshot: %s", err)
}

// Determine temporary path to download to.
tmppath := path + BackupSuffix

// Calculate path of next backup file.
// This uses the path if it doesn't exist.
// Otherwise it appends an autoincrementing number.
path, err = cmd.nextPath(path)
if err != nil {
return fmt.Errorf("next path: %s", err)
}

// Retrieve snapshot.
if err := cmd.download(u, ss, tmppath); err != nil {
return fmt.Errorf("download: %s", err)
}

// Rename temporary file to final path.
if err := os.Rename(tmppath, path); err != nil {
return fmt.Errorf("rename: %s", err)
}

// TODO: Check file integrity.

// Notify user of completion.
cmd.Logger.Println("backup complete")

return nil
}

// parseFlags parses and validates the command line arguments.
func (cmd *BackupCommand) parseFlags(args []string) (url.URL, string, error) {
fs := flag.NewFlagSet("", flag.ContinueOnError)
host := fs.String("host", DefaultSnapshotURL.String(), "")
fs.SetOutput(cmd.Stderr)
fs.Usage = cmd.printUsage
if err := fs.Parse(args); err != nil {
return url.URL{}, "", err
}

// Parse host.
u, err := url.Parse(*host)
if err != nil {
return url.URL{}, "", fmt.Errorf("parse host url: %s", err)
}

// Require output path.
path := fs.Arg(0)
if path == "" {
return url.URL{}, "", fmt.Errorf("snapshot path required")
}

return *u, path, nil
}

// nextPath returns the next file to write to.
func (cmd *BackupCommand) nextPath(path string) (string, error) {
// Use base path if it doesn't exist.
if _, err := os.Stat(path); os.IsNotExist(err) {
return path, nil
} else if err != nil {
return "", err
}

// Otherwise iterate through incremental files until one is available.
for i := 0; ; i++ {
s := fmt.Sprintf(path+".%d", i)
if _, err := os.Stat(s); os.IsNotExist(err) {
return s, nil
} else if err != nil {
return "", err
}
}
}

// download downloads a snapshot from a host to a given path.
func (cmd *BackupCommand) download(u url.URL, ss *influxdb.Snapshot, path string) error {
// Create local file to write to.
f, err := os.Create(path)
if err != nil {
return fmt.Errorf("open temp file: %s", err)
}
defer f.Close()

// Encode snapshot.
var buf bytes.Buffer
if ss != nil {
if err := json.NewEncoder(&buf).Encode(ss); err != nil {
return fmt.Errorf("encode snapshot: %s", err)
}
}

// Create request with existing snapshot as the body.
u.Path = "/snapshot"
req, err := http.NewRequest("GET", u.String(), &buf)
if err != nil {
return fmt.Errorf("new request: %s", err)
}

// Fetch the archive from the server.
resp, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("get: %s", err)
}
defer func() { _ = resp.Body.Close() }()

// Check the status code.
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("snapshot error: status=%d", resp.StatusCode)
}

// Write the archive to disk.
if _, err := io.Copy(f, resp.Body); err != nil {
fmt.Errorf("write snapshot: %s", err)
}

return nil
}

// printUsage prints the usage message to STDERR.
func (cmd *BackupCommand) printUsage() {
fmt.Fprintf(cmd.Stderr, `usage: influxd backup [flags] PATH

backup downloads a snapshot of a data node and saves it to disk.

-host <url>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: would this help output be more accurate if it read -host <host:port> ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that it is used as part of a URL could be considered an implementation detail, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We parse it as a URL so it needs to specify scheme as well (http or https).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then the default shown should include the scheme, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: 4bc92c3

The host to connect to snapshot.
Defaults to http://127.0.0.1:8087.
`)
}
122 changes: 122 additions & 0 deletions cmd/influxd/backup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main_test

import (
"bytes"
"net/http"
"net/http/httptest"
"os"
"strings"
"testing"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/cmd/influxd"
)

// Ensure the backup can download from the server and save to disk.
func TestBackupCommand(t *testing.T) {
// Mock the backup endpoint.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/snapshot" {
t.Fatalf("unexpected url path: %s", r.URL.Path)
}

// Write a simple snapshot to the buffer.
sw := influxdb.NewSnapshotWriter()
sw.Snapshot = &influxdb.Snapshot{Files: []influxdb.SnapshotFile{
{Name: "meta", Size: 5, Index: 10},
}}
sw.FileWriters["meta"] = influxdb.NopWriteToCloser(bytes.NewBufferString("55555"))
if _, err := sw.WriteTo(w); err != nil {
t.Fatal(err)
}
}))
defer s.Close()

// Create a temp path and remove incremental backups at the end.
path := tempfile()
defer os.Remove(path)
defer os.Remove(path)
defer os.Remove(path)

// Execute the backup against the mock server.
for i := 0; i < 3; i++ {
if err := NewBackupCommand().Run("-host", s.URL, path); err != nil {
t.Fatal(err)
}
}

// Verify snapshot and two incremental snapshots were written.
if _, err := os.Stat(path); err != nil {
t.Fatalf("snapshot not found: %s", err)
} else if _, err = os.Stat(path + ".0"); err != nil {
t.Fatalf("incremental snapshot(0) not found: %s", err)
} else if _, err = os.Stat(path + ".1"); err != nil {
t.Fatalf("incremental snapshot(1) not found: %s", err)
}
}

// Ensure the backup command returns an error if flags cannot be parsed.
func TestBackupCommand_ErrFlagParse(t *testing.T) {
cmd := NewBackupCommand()
if err := cmd.Run("-bad-flag"); err == nil || err.Error() != `flag provided but not defined: -bad-flag` {
t.Fatal(err)
} else if !strings.Contains(cmd.Stderr.String(), "usage") {
t.Fatal("usage message not displayed")
}
}

// Ensure the backup command returns an error if the host cannot be parsed.
func TestBackupCommand_ErrInvalidHostURL(t *testing.T) {
if err := NewBackupCommand().Run("-host", "http://%f"); err == nil || err.Error() != `parse host url: parse http://%f: hexadecimal escape in host` {
t.Fatal(err)
}
}

// Ensure the backup command returns an error if the output path is not specified.
func TestBackupCommand_ErrPathRequired(t *testing.T) {
if err := NewBackupCommand().Run("-host", "//localhost"); err == nil || err.Error() != `snapshot path required` {
t.Fatal(err)
}
}

// Ensure the backup returns an error if it cannot connect to the server.
func TestBackupCommand_ErrConnectionRefused(t *testing.T) {
// Start and immediately stop a server so we have a dead port.
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
s.Close()

// Execute the backup command.
path := tempfile()
defer os.Remove(path)
if err := NewBackupCommand().Run("-host", s.URL, path); err == nil || !strings.Contains(err.Error(), `connection refused`) {
t.Fatal(err)
}
}

// Ensure the backup returns any non-200 status codes.
func TestBackupCommand_ErrServerError(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer s.Close()

// Execute the backup command.
path := tempfile()
defer os.Remove(path)
if err := NewBackupCommand().Run("-host", s.URL, path); err == nil || err.Error() != `download: snapshot error: status=500` {
t.Fatal(err)
}
}

// BackupCommand is a test wrapper for main.BackupCommand.
type BackupCommand struct {
*main.BackupCommand
Stderr bytes.Buffer
}

// NewBackupCommand returns a new instance of BackupCommand.
func NewBackupCommand() *BackupCommand {
cmd := &BackupCommand{BackupCommand: main.NewBackupCommand()}
cmd.BackupCommand.Stderr = &cmd.Stderr
return cmd
}
22 changes: 22 additions & 0 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ const (
// DefaultDataPort represents the default port the data server runs on.
DefaultDataPort = 8086

// DefaultSnapshotBindAddress is the default bind address to serve snapshots from.
DefaultSnapshotBindAddress = "127.0.0.1"

// DefaultSnapshotPort is the default port to serve snapshots from.
DefaultSnapshotPort = 8087
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh? The help output says 8086.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: 4bc92c3


// DefaultJoinURLs represents the default URLs for joining a cluster.
DefaultJoinURLs = ""

Expand All @@ -49,6 +55,11 @@ const (
DefaultGraphiteDatabaseName = "graphite"
)

var DefaultSnapshotURL = url.URL{
Scheme: "http",
Host: net.JoinHostPort(DefaultSnapshotBindAddress, strconv.Itoa(DefaultSnapshotPort)),
}

// Config represents the configuration format for the influxd binary.
type Config struct {
Hostname string `toml:"hostname"`
Expand Down Expand Up @@ -101,6 +112,10 @@ type Config struct {
RetentionCreatePeriod Duration `toml:"retention-create-period"`
} `toml:"data"`

Snapshot struct {
BindAddress string `toml:"bind-address"`
Port int `toml:"port"`
}
Cluster struct {
Dir string `toml:"dir"`
} `toml:"cluster"`
Expand Down Expand Up @@ -168,6 +183,8 @@ func NewConfig() (*Config, error) {
c.Data.RetentionCheckEnabled = true
c.Data.RetentionCheckPeriod = Duration(10 * time.Minute)
c.Data.RetentionCreatePeriod = Duration(DefaultRetentionCreatePeriod)
c.Snapshot.BindAddress = DefaultSnapshotBindAddress
c.Snapshot.Port = DefaultSnapshotPort
c.Admin.Enabled = true
c.Admin.Port = 8083
c.ContinuousQuery.RecomputePreviousN = 2
Expand Down Expand Up @@ -215,6 +232,11 @@ func (c *Config) DataURL() url.URL {
}
}

// SnapshotAddr returns the TCP binding address for the snapshot handler.
func (c *Config) SnapshotAddr() string {
return net.JoinHostPort(c.Snapshot.BindAddress, strconv.Itoa(c.Snapshot.Port))
}

// BrokerAddr returns the binding address the Broker server
func (c *Config) BrokerAddr() string {
return fmt.Sprintf("%s:%d", c.BindAddress, c.Broker.Port)
Expand Down
Loading