-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Backup & Restore #2053
Merged
Merged
Backup & Restore #2053
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
0461f40
Add SnapshotWriter.
benbjohnson 963d277
Add "influxd backup" command.
benbjohnson 11c808f
Add restore and bootstrap.
benbjohnson 3befa12
Merge branch 'master' of https://github.com/influxdb/influxdb into ba…
benbjohnson 29cb550
Code review fixes.
benbjohnson 2401e69
Add incremental backups.
benbjohnson 4bc92c3
Code review fixes.
benbjohnson 21782c0
Merge branch 'master' of https://github.com/influxdb/influxdb into ba…
benbjohnson File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,182 @@ | ||
package main | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"flag" | ||
"fmt" | ||
"io" | ||
"log" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
|
||
"github.com/influxdb/influxdb" | ||
) | ||
|
||
// BackupSuffix is a suffix added to the backup while it's in-process. | ||
const BackupSuffix = ".pending" | ||
|
||
// BackupCommand represents the program execution for "influxd backup". | ||
type BackupCommand struct { | ||
// The logger passed to the ticker during execution. | ||
Logger *log.Logger | ||
|
||
// Standard input/output, overridden for testing. | ||
Stderr io.Writer | ||
} | ||
|
||
// NewBackupCommand returns a new instance of BackupCommand with default settings. | ||
func NewBackupCommand() *BackupCommand { | ||
return &BackupCommand{ | ||
Stderr: os.Stderr, | ||
} | ||
} | ||
|
||
// Run excutes the program. | ||
func (cmd *BackupCommand) Run(args ...string) error { | ||
// Set up logger. | ||
cmd.Logger = log.New(cmd.Stderr, "", log.LstdFlags) | ||
cmd.Logger.Printf("influxdb backup, version %s, commit %s", version, commit) | ||
|
||
// Parse command line arguments. | ||
u, path, err := cmd.parseFlags(args) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Retrieve snapshot from local file. | ||
ss, err := influxdb.ReadFileSnapshot(path) | ||
if err != nil && !os.IsNotExist(err) { | ||
return fmt.Errorf("read file snapshot: %s", err) | ||
} | ||
|
||
// Determine temporary path to download to. | ||
tmppath := path + BackupSuffix | ||
|
||
// Calculate path of next backup file. | ||
// This uses the path if it doesn't exist. | ||
// Otherwise it appends an autoincrementing number. | ||
path, err = cmd.nextPath(path) | ||
if err != nil { | ||
return fmt.Errorf("next path: %s", err) | ||
} | ||
|
||
// Retrieve snapshot. | ||
if err := cmd.download(u, ss, tmppath); err != nil { | ||
return fmt.Errorf("download: %s", err) | ||
} | ||
|
||
// Rename temporary file to final path. | ||
if err := os.Rename(tmppath, path); err != nil { | ||
return fmt.Errorf("rename: %s", err) | ||
} | ||
|
||
// TODO: Check file integrity. | ||
|
||
// Notify user of completion. | ||
cmd.Logger.Println("backup complete") | ||
|
||
return nil | ||
} | ||
|
||
// parseFlags parses and validates the command line arguments. | ||
func (cmd *BackupCommand) parseFlags(args []string) (url.URL, string, error) { | ||
fs := flag.NewFlagSet("", flag.ContinueOnError) | ||
host := fs.String("host", DefaultSnapshotURL.String(), "") | ||
fs.SetOutput(cmd.Stderr) | ||
fs.Usage = cmd.printUsage | ||
if err := fs.Parse(args); err != nil { | ||
return url.URL{}, "", err | ||
} | ||
|
||
// Parse host. | ||
u, err := url.Parse(*host) | ||
if err != nil { | ||
return url.URL{}, "", fmt.Errorf("parse host url: %s", err) | ||
} | ||
|
||
// Require output path. | ||
path := fs.Arg(0) | ||
if path == "" { | ||
return url.URL{}, "", fmt.Errorf("snapshot path required") | ||
} | ||
|
||
return *u, path, nil | ||
} | ||
|
||
// nextPath returns the next file to write to. | ||
func (cmd *BackupCommand) nextPath(path string) (string, error) { | ||
// Use base path if it doesn't exist. | ||
if _, err := os.Stat(path); os.IsNotExist(err) { | ||
return path, nil | ||
} else if err != nil { | ||
return "", err | ||
} | ||
|
||
// Otherwise iterate through incremental files until one is available. | ||
for i := 0; ; i++ { | ||
s := fmt.Sprintf(path+".%d", i) | ||
if _, err := os.Stat(s); os.IsNotExist(err) { | ||
return s, nil | ||
} else if err != nil { | ||
return "", err | ||
} | ||
} | ||
} | ||
|
||
// download downloads a snapshot from a host to a given path. | ||
func (cmd *BackupCommand) download(u url.URL, ss *influxdb.Snapshot, path string) error { | ||
// Create local file to write to. | ||
f, err := os.Create(path) | ||
if err != nil { | ||
return fmt.Errorf("open temp file: %s", err) | ||
} | ||
defer f.Close() | ||
|
||
// Encode snapshot. | ||
var buf bytes.Buffer | ||
if ss != nil { | ||
if err := json.NewEncoder(&buf).Encode(ss); err != nil { | ||
return fmt.Errorf("encode snapshot: %s", err) | ||
} | ||
} | ||
|
||
// Create request with existing snapshot as the body. | ||
u.Path = "/snapshot" | ||
req, err := http.NewRequest("GET", u.String(), &buf) | ||
if err != nil { | ||
return fmt.Errorf("new request: %s", err) | ||
} | ||
|
||
// Fetch the archive from the server. | ||
resp, err := http.DefaultClient.Do(req) | ||
if err != nil { | ||
return fmt.Errorf("get: %s", err) | ||
} | ||
defer func() { _ = resp.Body.Close() }() | ||
|
||
// Check the status code. | ||
if resp.StatusCode != http.StatusOK { | ||
return fmt.Errorf("snapshot error: status=%d", resp.StatusCode) | ||
} | ||
|
||
// Write the archive to disk. | ||
if _, err := io.Copy(f, resp.Body); err != nil { | ||
fmt.Errorf("write snapshot: %s", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// printUsage prints the usage message to STDERR. | ||
func (cmd *BackupCommand) printUsage() { | ||
fmt.Fprintf(cmd.Stderr, `usage: influxd backup [flags] PATH | ||
|
||
backup downloads a snapshot of a data node and saves it to disk. | ||
|
||
-host <url> | ||
The host to connect to snapshot. | ||
Defaults to http://127.0.0.1:8087. | ||
`) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
package main_test | ||
|
||
import ( | ||
"bytes" | ||
"net/http" | ||
"net/http/httptest" | ||
"os" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/influxdb/influxdb" | ||
"github.com/influxdb/influxdb/cmd/influxd" | ||
) | ||
|
||
// Ensure the backup can download from the server and save to disk. | ||
func TestBackupCommand(t *testing.T) { | ||
// Mock the backup endpoint. | ||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
if r.URL.Path != "/snapshot" { | ||
t.Fatalf("unexpected url path: %s", r.URL.Path) | ||
} | ||
|
||
// Write a simple snapshot to the buffer. | ||
sw := influxdb.NewSnapshotWriter() | ||
sw.Snapshot = &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ | ||
{Name: "meta", Size: 5, Index: 10}, | ||
}} | ||
sw.FileWriters["meta"] = influxdb.NopWriteToCloser(bytes.NewBufferString("55555")) | ||
if _, err := sw.WriteTo(w); err != nil { | ||
t.Fatal(err) | ||
} | ||
})) | ||
defer s.Close() | ||
|
||
// Create a temp path and remove incremental backups at the end. | ||
path := tempfile() | ||
defer os.Remove(path) | ||
defer os.Remove(path) | ||
defer os.Remove(path) | ||
|
||
// Execute the backup against the mock server. | ||
for i := 0; i < 3; i++ { | ||
if err := NewBackupCommand().Run("-host", s.URL, path); err != nil { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
// Verify snapshot and two incremental snapshots were written. | ||
if _, err := os.Stat(path); err != nil { | ||
t.Fatalf("snapshot not found: %s", err) | ||
} else if _, err = os.Stat(path + ".0"); err != nil { | ||
t.Fatalf("incremental snapshot(0) not found: %s", err) | ||
} else if _, err = os.Stat(path + ".1"); err != nil { | ||
t.Fatalf("incremental snapshot(1) not found: %s", err) | ||
} | ||
} | ||
|
||
// Ensure the backup command returns an error if flags cannot be parsed. | ||
func TestBackupCommand_ErrFlagParse(t *testing.T) { | ||
cmd := NewBackupCommand() | ||
if err := cmd.Run("-bad-flag"); err == nil || err.Error() != `flag provided but not defined: -bad-flag` { | ||
t.Fatal(err) | ||
} else if !strings.Contains(cmd.Stderr.String(), "usage") { | ||
t.Fatal("usage message not displayed") | ||
} | ||
} | ||
|
||
// Ensure the backup command returns an error if the host cannot be parsed. | ||
func TestBackupCommand_ErrInvalidHostURL(t *testing.T) { | ||
if err := NewBackupCommand().Run("-host", "http://%f"); err == nil || err.Error() != `parse host url: parse http://%f: hexadecimal escape in host` { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
// Ensure the backup command returns an error if the output path is not specified. | ||
func TestBackupCommand_ErrPathRequired(t *testing.T) { | ||
if err := NewBackupCommand().Run("-host", "//localhost"); err == nil || err.Error() != `snapshot path required` { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
// Ensure the backup returns an error if it cannot connect to the server. | ||
func TestBackupCommand_ErrConnectionRefused(t *testing.T) { | ||
// Start and immediately stop a server so we have a dead port. | ||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) | ||
s.Close() | ||
|
||
// Execute the backup command. | ||
path := tempfile() | ||
defer os.Remove(path) | ||
if err := NewBackupCommand().Run("-host", s.URL, path); err == nil || !strings.Contains(err.Error(), `connection refused`) { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
// Ensure the backup returns any non-200 status codes. | ||
func TestBackupCommand_ErrServerError(t *testing.T) { | ||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||
w.WriteHeader(http.StatusInternalServerError) | ||
})) | ||
defer s.Close() | ||
|
||
// Execute the backup command. | ||
path := tempfile() | ||
defer os.Remove(path) | ||
if err := NewBackupCommand().Run("-host", s.URL, path); err == nil || err.Error() != `download: snapshot error: status=500` { | ||
t.Fatal(err) | ||
} | ||
} | ||
|
||
// BackupCommand is a test wrapper for main.BackupCommand. | ||
type BackupCommand struct { | ||
*main.BackupCommand | ||
Stderr bytes.Buffer | ||
} | ||
|
||
// NewBackupCommand returns a new instance of BackupCommand. | ||
func NewBackupCommand() *BackupCommand { | ||
cmd := &BackupCommand{BackupCommand: main.NewBackupCommand()} | ||
cmd.BackupCommand.Stderr = &cmd.Stderr | ||
return cmd | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,6 +38,12 @@ const ( | |
// DefaultDataPort represents the default port the data server runs on. | ||
DefaultDataPort = 8086 | ||
|
||
// DefaultSnapshotBindAddress is the default bind address to serve snapshots from. | ||
DefaultSnapshotBindAddress = "127.0.0.1" | ||
|
||
// DefaultSnapshotPort is the default port to serve snapshots from. | ||
DefaultSnapshotPort = 8087 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Huh? The help output says 8086. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed: 4bc92c3 |
||
|
||
// DefaultJoinURLs represents the default URLs for joining a cluster. | ||
DefaultJoinURLs = "" | ||
|
||
|
@@ -49,6 +55,11 @@ const ( | |
DefaultGraphiteDatabaseName = "graphite" | ||
) | ||
|
||
var DefaultSnapshotURL = url.URL{ | ||
Scheme: "http", | ||
Host: net.JoinHostPort(DefaultSnapshotBindAddress, strconv.Itoa(DefaultSnapshotPort)), | ||
} | ||
|
||
// Config represents the configuration format for the influxd binary. | ||
type Config struct { | ||
Hostname string `toml:"hostname"` | ||
|
@@ -101,6 +112,10 @@ type Config struct { | |
RetentionCreatePeriod Duration `toml:"retention-create-period"` | ||
} `toml:"data"` | ||
|
||
Snapshot struct { | ||
BindAddress string `toml:"bind-address"` | ||
Port int `toml:"port"` | ||
} | ||
Cluster struct { | ||
Dir string `toml:"dir"` | ||
} `toml:"cluster"` | ||
|
@@ -168,6 +183,8 @@ func NewConfig() (*Config, error) { | |
c.Data.RetentionCheckEnabled = true | ||
c.Data.RetentionCheckPeriod = Duration(10 * time.Minute) | ||
c.Data.RetentionCreatePeriod = Duration(DefaultRetentionCreatePeriod) | ||
c.Snapshot.BindAddress = DefaultSnapshotBindAddress | ||
c.Snapshot.Port = DefaultSnapshotPort | ||
c.Admin.Enabled = true | ||
c.Admin.Port = 8083 | ||
c.ContinuousQuery.RecomputePreviousN = 2 | ||
|
@@ -215,6 +232,11 @@ func (c *Config) DataURL() url.URL { | |
} | ||
} | ||
|
||
// SnapshotAddr returns the TCP binding address for the snapshot handler. | ||
func (c *Config) SnapshotAddr() string { | ||
return net.JoinHostPort(c.Snapshot.BindAddress, strconv.Itoa(c.Snapshot.Port)) | ||
} | ||
|
||
// BrokerAddr returns the binding address the Broker server | ||
func (c *Config) BrokerAddr() string { | ||
return fmt.Sprintf("%s:%d", c.BindAddress, c.Broker.Port) | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: would this help output be more accurate if it read
-host <host:port>
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that it is used as part of a URL could be considered an implementation detail, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We parse it as a URL so it needs to specify scheme as well (
http
orhttps
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then the default shown should include the scheme, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed: 4bc92c3