From 0461f401f62ed2f86f0820ee42b6ba4455de0231 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Thu, 19 Mar 2015 22:23:52 -0600 Subject: [PATCH 1/6] Add SnapshotWriter. --- cmd/influxd/backup.go | 83 ++++++++++++++ cmd/influxd/backup_test.go | 47 ++++++++ cmd/influxd/main.go | 32 ++++++ cmd/influxd/restore.go | 20 ++++ httpd/handler.go | 25 +++++ httpd/handler_test.go | 27 +++++ server.go | 7 ++ server_test.go | 44 ++++++++ shard.go | 16 ++- snapshot.go | 219 +++++++++++++++++++++++++++++++++++++ snapshot_test.go | 127 +++++++++++++++++++++ 11 files changed, 642 insertions(+), 5 deletions(-) create mode 100644 cmd/influxd/backup.go create mode 100644 cmd/influxd/backup_test.go create mode 100644 cmd/influxd/restore.go create mode 100644 snapshot.go create mode 100644 snapshot_test.go diff --git a/cmd/influxd/backup.go b/cmd/influxd/backup.go new file mode 100644 index 00000000000..f583e6344f0 --- /dev/null +++ b/cmd/influxd/backup.go @@ -0,0 +1,83 @@ +package main + +import ( + "fmt" + "io" + "log" + "net/http" + "net/url" + "os" +) + +// BackupSuffix is a suffix added to the backup while it's in-process. +const BackupSuffix = ".pending" + +func Backup(host, path string) { + log.Printf("influxdb backup, version %s, commit %s", version, commit) + + // Parse host and generate URL. + u, err := url.Parse(host) + if err != nil { + log.Fatalf("host url parse error: %s", err) + } + + // TODO: Check highest index from local version. + + // Create local file to write to. + tmppath := path + BackupSuffix + f, err := os.Create(tmppath) + if err != nil { + log.Fatalf("create temp file: %s", err) + } + defer f.Close() + + // Download snapshot to temp location. + if err := downloadBackup(*u, f); err != nil { + log.Fatalf("download backup: %s", err) + } + + // Rename the archive to its final location. + f.Close() + if err := os.Rename(tmppath, path); err != nil { + log.Fatalf("rename: %s", err) + } + + // Notify user of completion. + log.Print("backup complete") +} + +// downloadBackup downloads a snapshot from a host to a given path. +func downloadBackup(u url.URL, f *os.File) error { + // Fetch the archive from the server. + u.Path = "/backup" + resp, err := http.Get(u.String()) + if err != nil { + return fmt.Errorf("get backup: %s", err) + } + defer func() { _ = resp.Body.Close() }() + + // Check the status code. + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("backup error: status=%d", resp.StatusCode) + } + + // Write the archive to disk. + if _, err := io.Copy(f, resp.Body); err != nil { + fmt.Errorf("write backup: %s", err) + } + + return nil +} + +func printBackupUsage() { + log.Printf(`usage: influxd backup [flags] + +backup downloads a snapshot of a data node and saves it to disk. + + -host + The host to connect to snapshot. + + -output + Path to where the snapshot will be saved. +`) +} diff --git a/cmd/influxd/backup_test.go b/cmd/influxd/backup_test.go new file mode 100644 index 00000000000..00ebf50f3cc --- /dev/null +++ b/cmd/influxd/backup_test.go @@ -0,0 +1,47 @@ +package main_test + +import ( + "archive/tar" + "bytes" + "io/ioutil" + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdb/influxdb/cmd/influxd" +) + +// Ensure the backup can download from the server and save to disk. +func TestBackup(t *testing.T) { + // Mock the backup endpoint. + var buf bytes.Buffer + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/backup" { + t.Fatalf("unexpected url path: %s", r.URL.Path) + } + + // Write to the buffer to verify contents later. + tw := tar.NewWriter(&buf) + tw.WriteHeader(&tar.Header{Name: "foo", Size: 3}) + tw.Write([]byte("bar")) + tw.Close() + + // Write buffer to response. + w.Write(buf.Bytes()) + })) + defer s.Close() + + // Execute the backup against the mock server. + path := tempfile() + main.Backup(s.URL, path) + + // Verify backup was written. + b, err := ioutil.ReadFile(path) + if err != nil { + t.Fatal(err) + } else if !bytes.Equal(buf.Bytes(), b) || true { + actpath := path + ".actual" + ioutil.WriteFile(actpath, buf.Bytes(), 0666) + t.Fatalf("archive mismatch:\n\nexp=%s\n\ngot=%s\n\n", path, actpath) + } +} diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index cd7d6411ed8..cd394965436 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -68,6 +68,10 @@ func main() { execRun(args[1:]) case "": execRun(args) + case "backup": + execBackup(args[1:]) + case "restore": + execRestore(args[1:]) case "version": execVersion(args[1:]) case "help": @@ -121,6 +125,34 @@ func execRun(args []string) { <-(chan struct{})(nil) } +// execBackup connects to a data node and downloads a snapshot. +func execBackup(args []string) { + // Parse command line arguments. + fs := flag.NewFlagSet("", flag.ExitOnError) + var ( + host = fs.String("host", "", "") + path = fs.String("output", "", "") + ) + fs.Usage = printBackupUsage + fs.Parse(args) + + Backup(*host, *path) +} + +// execRestore restores a backup archive to the data directory and bootstraps the broker. +func execRestore(args []string) { + // Parse command line arguments. + fs := flag.NewFlagSet("", flag.ExitOnError) + configPath := fs.String("config", "", "") + fs.Usage = printRestoreUsage + fs.Parse(args) + + // Path to the archive is the first argument. + path := fs.Arg(0) + + Restore(path, *configPath) +} + // execVersion runs the "version" command. // Prints the commit SHA1 if set by the build process. func execVersion(args []string) { diff --git a/cmd/influxd/restore.go b/cmd/influxd/restore.go new file mode 100644 index 00000000000..8eab746cc8b --- /dev/null +++ b/cmd/influxd/restore.go @@ -0,0 +1,20 @@ +package main + +import ( + "log" +) + +func Restore(path, configPath string) { + log.Printf("influxdb archive, version %s, commit %s", version, commit) + panic("not yet implemented") // TODO +} + +func printRestoreUsage() { + log.Printf(`usage: influxd restore [flags] PATH + +restore expands an archive into the data directory specified by the config. + + -config + The path to the configuration file. +`) +} diff --git a/httpd/handler.go b/httpd/handler.go index f1784124d99..0cc1e1549fd 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -1,6 +1,7 @@ package httpd import ( + "archive/tar" "encoding/json" "errors" "fmt" @@ -632,3 +633,27 @@ func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler } }) } + +// BackupHandler streams out the metastore and all shard databases as a tar archive. +type BackupHandler struct { + CreateSnapshotWriterFunc func(*influxdb.SnapshotWriter, error) +} + +func (h *BackupHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Retrieve a snapshot from the server. + sw, err := h.CreateSnapshotWriter() + if err != nil { + httpError(w, "error creating snapshot writer: "+err.Error(), false, http.StatusInternalServerError) + return + } + defer sw.Close() + + // TODO: Subtract existing snapshot from writer. + + // Write to response. + w.Header().Set("Content-Length", strconv.FormatInt(sw.Size(), 10)) + if _, err := sw.WriteTo(w); err != nil { + httpError(w, "error writing snapshot: "+err.Error(), false, http.StatusInternalServerError) + return + } +} diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 84fbd14ee49..421598f3f4a 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -1454,6 +1454,33 @@ func TestHandler_ProcessContinousQueries(t *testing.T) { } } +// Ensure the backup handler can write a snapshot as a tar archive over HTTP. +func TestBackupHandler(t *testing.T) { + // Mock the snapshot. + var snapshotter BackupHandlerSnapshotter + snapshotter.SnapshotFunc = func() (*influxdb.Snapshot, error) { + + } + + // + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + })) + defer s.Close() + + status, _ := MustHTTP("POST", s.URL+`/process_continuous_queries`, nil, nil, "") + if status != http.StatusAccepted { + t.Fatalf("unexpected status: %d", status) + } +} + +// BackupHandlerSnapshotter is a mock type for the BackupHandler.Snapshotter interface. +type BackupHandlerSnapshotter struct { + SnapshotFunc func() (*influxdb.Snapshot, error) +} + +func (s *BackupHandlerSnapshotter) Snapshot(*influxdb.Snapshot, error) { return s.SnapshotFunc() } + // batchWrite JSON Unmarshal tests // Utility functions for this test suite. diff --git a/server.go b/server.go index 985d15772e5..e8bd8c9d7d0 100644 --- a/server.go +++ b/server.go @@ -3517,3 +3517,10 @@ func (s *Server) reportStats(version string, clusterID uint64) { client := http.Client{Timeout: time.Duration(5 * time.Second)} go client.Post("http://m.influxdb.com:8086/db/reporting/series?u=reporter&p=influxdb", "application/json", data) } + +// CreateSnapshotWriter returns a writer for the current snapshot. +func (s *Server) CreateSnapshotWriter() (*SnapshotWriter, error) { + s.mu.RLock() + defer s.mu.RUnlock() + return createServerSnapshotWriter(s) +} diff --git a/server_test.go b/server_test.go index d5be388d0cf..f2c396b31a6 100644 --- a/server_test.go +++ b/server_test.go @@ -2200,6 +2200,50 @@ func TestServer_RunContinuousQueries(t *testing.T) { verify(3, `{"series":[{"name":"cpu_region","tags":{"region":"us-east"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",25]]},{"name":"cpu_region","tags":{"region":"us-west"},"columns":["time","mean"],"values":[["1970-01-01T00:00:00Z",75]]}]}`) } +// Ensure the server can create a snapshot writer. +func TestServer_CreateSnapshotWriter(t *testing.T) { + c := test.NewMessagingClient() + s := OpenServer(c) + defer s.Close() + + // Write metadata. + s.CreateDatabase("db") + s.CreateRetentionPolicy("db", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour}) + s.CreateUser("susy", "pass", false) + + // Write one point. + index, err := s.WriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(100)}}}) + if err != nil { + t.Fatal(err) + } + time.Sleep(1 * time.Second) // FIX: Sync on shard. + + // Create snapshot writer. + sw, err := s.CreateSnapshotWriter() + if err != nil { + t.Fatal(err) + } + defer sw.Close() + + // Verify snapshot is correct. + // + // NOTE: Sizes and indices here are subject to change. + // They are tracked here so that we can see when they change over time. + if len(sw.Snapshot.Files) != 2 { + t.Fatalf("unexpected file count: %d", len(sw.Snapshot.Files)) + } else if !reflect.DeepEqual(sw.Snapshot.Files[0], influxdb.SnapshotFile{Name: "meta", Size: 45056, Index: 6}) { + t.Fatalf("unexpected file(0): %#v", sw.Snapshot.Files[0]) + } else if !reflect.DeepEqual(sw.Snapshot.Files[1], influxdb.SnapshotFile{Name: "shards/1", Size: 24576, Index: index}) { + t.Fatalf("unexpected file(1): %#v", sw.Snapshot.Files[1]) + } + + // Write to buffer to verify that it does not error or panic. + var buf bytes.Buffer + if _, err := sw.WriteTo(&buf); err != nil { + t.Fatal(err) + } +} + func mustMarshalJSON(v interface{}) string { b, err := json.Marshal(v) if err != nil { diff --git a/shard.go b/shard.go index 60629b4ba52..0cb2eeb5c06 100644 --- a/shard.go +++ b/shard.go @@ -88,15 +88,12 @@ func (s *Shard) open(path string, conn MessagingConn) error { s.store = store // Initialize store. - s.index = 0 if err := s.store.Update(func(tx *bolt.Tx) error { + _, _ = tx.CreateBucketIfNotExists([]byte("meta")) _, _ = tx.CreateBucketIfNotExists([]byte("values")) // Find highest replicated index. - b, _ := tx.CreateBucketIfNotExists([]byte("meta")) - if buf := b.Get([]byte("index")); len(buf) > 0 { - s.index = btou64(buf) - } + s.index = shardMetaIndex(tx) // Open connection. if err := conn.Open(s.index, true); err != nil { @@ -117,6 +114,15 @@ func (s *Shard) open(path string, conn MessagingConn) error { return nil } +// shardMetaIndex returns the index from the "meta" bucket on a transaction. +func shardMetaIndex(tx *bolt.Tx) uint64 { + var index uint64 + if buf := tx.Bucket([]byte("meta")).Get([]byte("index")); len(buf) > 0 { + index = btou64(buf) + } + return index +} + // close shuts down the shard's store. func (s *Shard) close() error { // Wait for goroutines to stop. diff --git a/snapshot.go b/snapshot.go new file mode 100644 index 00000000000..7c84c574864 --- /dev/null +++ b/snapshot.go @@ -0,0 +1,219 @@ +package influxdb + +import ( + "archive/tar" + "encoding/json" + "fmt" + "io" + "path" + "path/filepath" + + "github.com/boltdb/bolt" +) + +// Snapshot represents the state of the Server at a given time. +type Snapshot struct { + Files []SnapshotFile `json:"files"` +} + +// SnapshotFile represents a single file in a Snapshot. +type SnapshotFile struct { + Name string `json:"name"` // filename + Size int64 `json:"size"` // file size + Index uint64 `json:"index"` // highest index applied +} + +// SnapshotWriter writes a snapshot and the underlying files to disk as a tar archive. +// This type is not safe for concurrent use. +type SnapshotWriter struct { + // The snapshot to write from. + // Removing files from the snapshot after creation will cause those files to be ignored. + Snapshot *Snapshot + + // Writers for each file by filename. + // Writers will be closed as they're processed and will close by the end of WriteTo(). + FileWriters map[string]SnapshotFileWriter +} + +// NewSnapshotWriter returns a new instance of SnapshotWriter. +func NewSnapshotWriter() *SnapshotWriter { + return &SnapshotWriter{ + Snapshot: &Snapshot{}, + FileWriters: make(map[string]SnapshotFileWriter), + } +} + +// Close closes all file writers on the snapshot. +func (sw *SnapshotWriter) Close() error { + for _, fw := range sw.FileWriters { + _ = fw.Close() + } + return nil +} + +// closeUnusedWriters closes all file writers not on the snapshot. +// This allows transactions on these files to be short lived. +func (sw *SnapshotWriter) closeUnusedWriters() { +loop: + for name, fw := range sw.FileWriters { + // Find writer in snapshot. + for _, f := range sw.Snapshot.Files { + if f.Name == name { + continue loop + } + } + + // If not found then close it. + _ = fw.Close() + } +} + +// WriteTo writes the snapshot to the writer. +// File writers are closed as they are written. +// This function will always return n == 0. +func (sw *SnapshotWriter) WriteTo(w io.Writer) (n int64, err error) { + // Close any file writers that aren't required. + sw.closeUnusedWriters() + + // Begin writing a tar file to the output. + tw := tar.NewWriter(w) + defer tw.Close() + + // Write manifest file. + if err := sw.writeManifestTo(tw); err != nil { + return 0, fmt.Errorf("write manifest: %s", err) + } + + // Write each backup file. + for _, f := range sw.Snapshot.Files { + if err := sw.writeFileTo(tw, &f); err != nil { + return 0, fmt.Errorf("write file: %s", err) + } + } + + // Close tar writer and check error. + if err := tw.Close(); err != nil { + return 0, fmt.Errorf("tar close: %s", err) + } + + return 0, nil +} + +// writeManifestTo writes a manifest of the contents of the snapshot to the archive. +func (sw *SnapshotWriter) writeManifestTo(tw *tar.Writer) error { + // Convert snapshot to JSON. + b, err := json.Marshal(sw.Snapshot) + if err != nil { + return fmt.Errorf("marshal json: %s", err) + } + + // Write header & file. + if err := tw.WriteHeader(&tar.Header{Name: "manifest", Size: int64(len(b))}); err != nil { + return fmt.Errorf("write header: %s", err) + } + if _, err := tw.Write(b); err != nil { + return fmt.Errorf("write: %s", err) + } + + return nil +} + +// writeFileTo writes a single file to the archive. +func (sw *SnapshotWriter) writeFileTo(tw *tar.Writer, f *SnapshotFile) error { + // Retrieve the file writer by filename. + fw := sw.FileWriters[f.Name] + if fw == nil { + return fmt.Errorf("file writer not found: name=%s", f.Name) + } + + // Write file header. + if err := tw.WriteHeader(&tar.Header{ + Name: f.Name, + Size: f.Size, + }); err != nil { + return fmt.Errorf("write header: file=%s, err=%s", f.Name, err) + } + + // Copy the database to the writer. + if nn, err := fw.WriteTo(tw); err != nil { + return fmt.Errorf("write: file=%s, err=%s", f.Name, err) + } else if nn != f.Size { + return fmt.Errorf("short write: file=%s", f.Name) + } + + // Close the writer. + if err := fw.Close(); err != nil { + return fmt.Errorf("close: file=%s, err=%s", f.Name, err) + } + + return nil +} + +// createServerSnapshotWriter creates a snapshot writer from a locked server. +func createServerSnapshotWriter(s *Server) (*SnapshotWriter, error) { + // Exit if the server is closed. + if !s.opened() { + return nil, ErrServerClosed + } + + // Create snapshot writer. + sw := NewSnapshotWriter() + + if err := func() error { + // Create snapshot file for metastore. + tx, err := s.meta.db.Begin(false) + if err != nil { + return fmt.Errorf("metastore begin: %s", err) + } + name := "meta" + sw.Snapshot.Files = append(sw.Snapshot.Files, SnapshotFile{ + Name: name, + Size: tx.Size(), + Index: (&metatx{tx}).index(), + }) + sw.FileWriters[name] = &boltTxCloser{tx} + + // Create files for each shard. + for _, sh := range s.shards { + // Ignore shard if it's not owned by the server. + if sh.store == nil { + continue + } + + // Begin transaction and create snapshot file. + tx, err := sh.store.Begin(false) + if err != nil { + return fmt.Errorf("shard begin: id=%d, err=%s", sh.ID, err) + } + + name := path.Join("shards", filepath.Base(sh.store.Path())) + sw.Snapshot.Files = append(sw.Snapshot.Files, SnapshotFile{ + Name: name, + Size: tx.Size(), + Index: shardMetaIndex(tx), + }) + sw.FileWriters[name] = &boltTxCloser{tx} + } + + return nil + }(); err != nil { + _ = sw.Close() + return nil, err + } + + return sw, nil +} + +// SnapshotFileWriter is the interface used for writing a file to a snapshot. +type SnapshotFileWriter interface { + io.WriterTo + io.Closer +} + +// boltTxCloser wraps a Bolt transaction to implement io.Closer. +type boltTxCloser struct { + *bolt.Tx +} + +// Close rollsback the transaction. +func (tx *boltTxCloser) Close() error { return tx.Rollback() } diff --git a/snapshot_test.go b/snapshot_test.go new file mode 100644 index 00000000000..b7149561471 --- /dev/null +++ b/snapshot_test.go @@ -0,0 +1,127 @@ +package influxdb_test + +import ( + "archive/tar" + "bytes" + "io" + "io/ioutil" + "testing" + + "github.com/influxdb/influxdb" +) + +// Ensure a snapshot writer can write a set of files to an archive +func TestSnapshotWriter(t *testing.T) { + // Create a new writer with a snapshot and file writers. + sw := influxdb.NewSnapshotWriter() + sw.Snapshot.Files = []influxdb.SnapshotFile{ + {Name: "meta", Size: 3, Index: 12}, + {Name: "shards/1", Size: 5, Index: 15}, + } + sw.FileWriters["meta"] = &bufCloser{Buffer: *bytes.NewBufferString("foo")} + sw.FileWriters["shards/1"] = &bufCloser{Buffer: *bytes.NewBufferString("55555")} + + // Write the snapshot to a buffer. + var buf bytes.Buffer + if _, err := sw.WriteTo(&buf); err != nil { + t.Fatal(err) + } + + // Ensure file writers are closed as they're writing. + if !sw.FileWriters["meta"].(*bufCloser).closed { + t.Fatal("meta file writer not closed") + } else if !sw.FileWriters["shards/1"].(*bufCloser).closed { + t.Fatal("shards/1 file writer not closed") + } + + // Close writer. + if err := sw.Close(); err != nil { + t.Fatal(err) + } + + // Read archive from buffer. + tr := tar.NewReader(&buf) + + // First file should be the manifest. + if hdr, err := tr.Next(); err != nil { + t.Fatalf("unexpected error(manifest): %s", err) + } else if hdr.Name != "manifest" { + t.Fatalf("unexpected header name(manifest): %s", hdr.Name) + } else if hdr.Size != 87 { + t.Fatalf("unexpected header size(manifest): %d", hdr.Size) + } else if b := MustReadAll(tr); string(b) != `{"files":[{"name":"meta","size":3,"index":12},{"name":"shards/1","size":5,"index":15}]}` { + t.Fatalf("unexpected file(manifest): %s", b) + } + + // Next should be the meta file. + if hdr, err := tr.Next(); err != nil { + t.Fatalf("unexpected error(meta): %s", err) + } else if hdr.Name != "meta" { + t.Fatalf("unexpected header name(meta): %s", hdr.Name) + } else if hdr.Size != 3 { + t.Fatalf("unexpected header size(meta): %d", hdr.Size) + } else if b := MustReadAll(tr); string(b) != `foo` { + t.Fatalf("unexpected file(meta): %s", b) + } + + // Next should be the shard file. + if hdr, err := tr.Next(); err != nil { + t.Fatalf("unexpected error(shards/1): %s", err) + } else if hdr.Name != "shards/1" { + t.Fatalf("unexpected header name(shards/1): %s", hdr.Name) + } else if hdr.Size != 5 { + t.Fatalf("unexpected header size(shards/1): %d", hdr.Size) + } else if b := MustReadAll(tr); string(b) != `55555` { + t.Fatalf("unexpected file(shards/1): %s", b) + } + + // Check for end of archive. + if _, err := tr.Next(); err != io.EOF { + t.Fatalf("expected EOF: %s", err) + } +} + +// Ensure a snapshot writer closes unused file writers. +func TestSnapshotWriter_CloseUnused(t *testing.T) { + // Create a new writer with a snapshot and file writers. + sw := influxdb.NewSnapshotWriter() + sw.Snapshot.Files = []influxdb.SnapshotFile{ + {Name: "meta", Size: 3}, + } + sw.FileWriters["meta"] = &bufCloser{Buffer: *bytes.NewBufferString("foo")} + sw.FileWriters["other"] = &bufCloser{Buffer: *bytes.NewBufferString("55555")} + + // Write the snapshot to a buffer. + var buf bytes.Buffer + if _, err := sw.WriteTo(&buf); err != nil { + t.Fatal(err) + } + + // Ensure other writer is closed. + // This should happen at the beginning of the write so that it doesn't have + // to wait until the close of the whole writer. + if !sw.FileWriters["other"].(*bufCloser).closed { + t.Fatal("'other' file writer not closed") + } +} + +// bufCloser adds a Close() method to a bytes.Buffer +type bufCloser struct { + bytes.Buffer + closed bool +} + +// Close marks the buffer as closed. +func (b *bufCloser) Close() error { + b.closed = true + return nil +} + +// Reads all data from the reader. Panic on error. +func MustReadAll(r io.Reader) []byte { + b, err := ioutil.ReadAll(r) + if err != nil { + panic(err.Error()) + } + return b +} From 963d277a75d0e27b9e68ff5ad7073d146e9fba2a Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 22 Mar 2015 10:28:04 -0600 Subject: [PATCH 2/6] Add "influxd backup" command. This commit adds the backup command to the influxd binary as well as implements a SnapshotWriter in the influxdb package. By default the snapshot handler binds to 127.0.0.1 so it cannot be accessed outside of the local machine. --- cmd/influxd/backup.go | 105 ++++++++++++++++----- cmd/influxd/backup_test.go | 107 ++++++++++++++++----- cmd/influxd/config.go | 22 +++++ cmd/influxd/main.go | 19 +--- cmd/influxd/run.go | 11 +++ cmd/influxd/server_integration_test.go | 3 + httpd/handler.go | 10 +- httpd/handler_test.go | 55 +++++++---- snapshot.go | 126 +++++++++++++++++++------ snapshot_test.go | 67 +++++++++++++ 10 files changed, 408 insertions(+), 117 deletions(-) diff --git a/cmd/influxd/backup.go b/cmd/influxd/backup.go index f583e6344f0..ac161b42c91 100644 --- a/cmd/influxd/backup.go +++ b/cmd/influxd/backup.go @@ -1,6 +1,7 @@ package main import ( + "flag" "fmt" "io" "log" @@ -12,65 +13,117 @@ import ( // BackupSuffix is a suffix added to the backup while it's in-process. const BackupSuffix = ".pending" -func Backup(host, path string) { - log.Printf("influxdb backup, version %s, commit %s", version, commit) +// BackupCommand represents the program execution for "influxd backup". +type BackupCommand struct { + // The logger passed to the ticker during execution. + Logger *log.Logger - // Parse host and generate URL. - u, err := url.Parse(host) + // Standard input/output, overridden for testing. + Stderr io.Writer +} + +// NewBackupCommand returns a new instance of BackupCommand with default settings. +func NewBackupCommand() *BackupCommand { + return &BackupCommand{ + Stderr: os.Stderr, + } +} + +// Run excutes the program. +func (cmd *BackupCommand) Run(args ...string) error { + // Set up logger. + cmd.Logger = log.New(cmd.Stderr, "", log.LstdFlags) + cmd.Logger.Printf("influxdb backup, version %s, commit %s", version, commit) + + // Parse command line arguments. + u, path, err := cmd.parseFlags(args) if err != nil { - log.Fatalf("host url parse error: %s", err) + return err } // TODO: Check highest index from local version. - // Create local file to write to. + // Determine temporary path to download to. tmppath := path + BackupSuffix - f, err := os.Create(tmppath) - if err != nil { - log.Fatalf("create temp file: %s", err) - } - defer f.Close() - // Download snapshot to temp location. - if err := downloadBackup(*u, f); err != nil { - log.Fatalf("download backup: %s", err) + // Retrieve snapshot. + if err := cmd.download(u, tmppath); err != nil { + return fmt.Errorf("download: %s", err) } - // Rename the archive to its final location. - f.Close() + // Rename temporary file to final path. if err := os.Rename(tmppath, path); err != nil { - log.Fatalf("rename: %s", err) + return fmt.Errorf("rename: %s", err) } + // TODO: Check file integrity. + // Notify user of completion. - log.Print("backup complete") + cmd.Logger.Println("backup complete") + + return nil } -// downloadBackup downloads a snapshot from a host to a given path. -func downloadBackup(u url.URL, f *os.File) error { +// parseFlags parses and validates the command line arguments. +func (cmd *BackupCommand) parseFlags(args []string) (url.URL, string, error) { + fs := flag.NewFlagSet("", flag.ContinueOnError) + var ( + host = fs.String("host", DefaultSnapshotURL.String(), "") + path = fs.String("output", "", "") + ) + fs.SetOutput(cmd.Stderr) + fs.Usage = cmd.printUsage + if err := fs.Parse(args); err != nil { + return url.URL{}, "", err + } + + // Parse host. + u, err := url.Parse(*host) + if err != nil { + return url.URL{}, "", fmt.Errorf("parse host url: %s", err) + } + + // Require output path. + if *path == "" { + return url.URL{}, "", fmt.Errorf("output path required") + } + + return *u, *path, nil +} + +// download downloads a snapshot from a host to a given path. +func (cmd *BackupCommand) download(u url.URL, path string) error { + // Create local file to write to. + f, err := os.Create(path) + if err != nil { + return fmt.Errorf("open temp file: %s", err) + } + defer f.Close() + // Fetch the archive from the server. - u.Path = "/backup" + u.Path = "/snapshot" resp, err := http.Get(u.String()) if err != nil { - return fmt.Errorf("get backup: %s", err) + return fmt.Errorf("get: %s", err) } defer func() { _ = resp.Body.Close() }() // Check the status code. if resp.StatusCode != http.StatusOK { - return fmt.Errorf("backup error: status=%d", resp.StatusCode) + return fmt.Errorf("snapshot error: status=%d", resp.StatusCode) } // Write the archive to disk. if _, err := io.Copy(f, resp.Body); err != nil { - fmt.Errorf("write backup: %s", err) + fmt.Errorf("write snapshot: %s", err) } return nil } -func printBackupUsage() { - log.Printf(`usage: influxd backup [flags] +// printUsage prints the usage message to STDERR. +func (cmd *BackupCommand) printUsage() { + fmt.Fprintf(cmd.Stderr, `usage: influxd backup [flags] backup downloads a snapshot of a data node and saves it to disk. diff --git a/cmd/influxd/backup_test.go b/cmd/influxd/backup_test.go index 00ebf50f3cc..cf75758ec64 100644 --- a/cmd/influxd/backup_test.go +++ b/cmd/influxd/backup_test.go @@ -1,47 +1,112 @@ package main_test import ( - "archive/tar" "bytes" - "io/ioutil" "net/http" "net/http/httptest" + "os" + "strings" "testing" + "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/cmd/influxd" ) // Ensure the backup can download from the server and save to disk. -func TestBackup(t *testing.T) { +func TestBackupCommand(t *testing.T) { // Mock the backup endpoint. - var buf bytes.Buffer s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/backup" { + if r.URL.Path != "/snapshot" { t.Fatalf("unexpected url path: %s", r.URL.Path) } - // Write to the buffer to verify contents later. - tw := tar.NewWriter(&buf) - tw.WriteHeader(&tar.Header{Name: "foo", Size: 3}) - tw.Write([]byte("bar")) - tw.Close() - - // Write buffer to response. - w.Write(buf.Bytes()) + // Write a simple snapshot to the buffer. + sw := influxdb.NewSnapshotWriter() + sw.Snapshot = &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "meta", Size: 5, Index: 10}, + }} + sw.FileWriters["meta"] = influxdb.NopWriteToCloser(bytes.NewBufferString("55555")) + if _, err := sw.WriteTo(w); err != nil { + t.Fatal(err) + } })) defer s.Close() // Execute the backup against the mock server. path := tempfile() - main.Backup(s.URL, path) + defer os.Remove(path) + if err := NewBackupCommand().Run("-host", s.URL, "-output", path); err != nil { + t.Fatal(err) + } + + // Verify snapshot was written to path. + if _, err := os.Stat(path); err != nil { + t.Fatalf("snapshot not found: %s", err) + } +} + +// Ensure the backup command returns an error if flags cannot be parsed. +func TestBackupCommand_ErrFlagParse(t *testing.T) { + cmd := NewBackupCommand() + if err := cmd.Run("-bad-flag"); err == nil || err.Error() != `flag provided but not defined: -bad-flag` { + t.Fatal(err) + } else if !strings.Contains(cmd.Stderr.String(), "usage") { + t.Fatal("usage message not displayed") + } +} + +// Ensure the backup command returns an error if the host cannot be parsed. +func TestBackupCommand_ErrInvalidHostURL(t *testing.T) { + if err := NewBackupCommand().Run("-host", "http://%f"); err == nil || err.Error() != `parse host url: parse http://%f: hexadecimal escape in host` { + t.Fatal(err) + } +} + +// Ensure the backup command returns an error if the output path is not specified. +func TestBackupCommand_ErrPathRequired(t *testing.T) { + if err := NewBackupCommand().Run("-host", "//localhost"); err == nil || err.Error() != `output path required` { + t.Fatal(err) + } +} + +// Ensure the backup returns an error if it cannot connect to the server. +func TestBackupCommand_ErrConnectionRefused(t *testing.T) { + // Start and immediate stop a server so we have a dead port. + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) + s.Close() + + // Execute the backup command. + path := tempfile() + defer os.Remove(path) + if err := NewBackupCommand().Run("-host", s.URL, "-output", path); err == nil || !strings.Contains(err.Error(), `connection refused`) { + t.Fatal(err) + } +} - // Verify backup was written. - b, err := ioutil.ReadFile(path) - if err != nil { +// Ensure the backup returns any non-200 status codes. +func TestBackupCommand_ErrServerError(t *testing.T) { + s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer s.Close() + + // Execute the backup command. + path := tempfile() + defer os.Remove(path) + if err := NewBackupCommand().Run("-host", s.URL, "-output", path); err == nil || err.Error() != `download: snapshot error: status=500` { t.Fatal(err) - } else if !bytes.Equal(buf.Bytes(), b) || true { - actpath := path + ".actual" - ioutil.WriteFile(actpath, buf.Bytes(), 0666) - t.Fatalf("archive mismatch:\n\nexp=%s\n\ngot=%s\n\n", path, actpath) } } + +// BackupCommand is a test wrapper for main.BackupCommand. +type BackupCommand struct { + *main.BackupCommand + Stderr bytes.Buffer +} + +// NewBackupCommand returns a new instance of BackupCommand. +func NewBackupCommand() *BackupCommand { + cmd := &BackupCommand{BackupCommand: main.NewBackupCommand()} + cmd.BackupCommand.Stderr = &cmd.Stderr + return cmd +} diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index a711a600873..80160b1d345 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -37,6 +37,12 @@ const ( // DefaultDataPort represents the default port the data server runs on. DefaultDataPort = 8086 + // DefaultSnapshotBindAddress is the default bind address to serve snapshots from. + DefaultSnapshotBindAddress = "127.0.0.1" + + // DefaultSnapshotPort is the default port to serve snapshots from. + DefaultSnapshotPort = 8086 + // DefaultJoinURLs represents the default URLs for joining a cluster. DefaultJoinURLs = "" @@ -48,6 +54,11 @@ const ( DefaultGraphiteDatabaseName = "graphite" ) +var DefaultSnapshotURL = url.URL{ + Scheme: "http", + Host: net.JoinHostPort(DefaultSnapshotBindAddress, strconv.Itoa(DefaultSnapshotPort)), +} + // Config represents the configuration format for the influxd binary. type Config struct { Hostname string `toml:"hostname"` @@ -100,6 +111,10 @@ type Config struct { RetentionCreatePeriod Duration `toml:"retention-create-period"` } `toml:"data"` + Snapshot struct { + BindAddress string `toml:"bind-address"` + Port int `toml:"port"` + } Cluster struct { Dir string `toml:"dir"` } `toml:"cluster"` @@ -164,6 +179,8 @@ func NewConfig() *Config { c.Data.RetentionCheckEnabled = true c.Data.RetentionCheckPeriod = Duration(10 * time.Minute) c.Data.RetentionCreatePeriod = Duration(DefaultRetentionCreatePeriod) + c.Snapshot.BindAddress = DefaultSnapshotBindAddress + c.Snapshot.Port = DefaultSnapshotPort c.Admin.Enabled = true c.Admin.Port = 8083 c.ContinuousQuery.RecomputePreviousN = 2 @@ -211,6 +228,11 @@ func (c *Config) DataURL() url.URL { } } +// SnapshotAddr returns the TCP binding address for the snapshot handler. +func (c *Config) SnapshotAddr() string { + return net.JoinHostPort(c.Snapshot.BindAddress, strconv.Itoa(c.Snapshot.Port)) +} + // BrokerAddr returns the binding address the Broker server func (c *Config) BrokerAddr() string { return fmt.Sprintf("%s:%d", c.BindAddress, c.Broker.Port) diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index cd394965436..39f52adab14 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -69,7 +69,10 @@ func main() { case "": execRun(args) case "backup": - execBackup(args[1:]) + cmd := NewBackupCommand() + if err := cmd.Run(args[1:]...); err != nil { + log.Fatalf("backup: %s", err) + } case "restore": execRestore(args[1:]) case "version": @@ -125,20 +128,6 @@ func execRun(args []string) { <-(chan struct{})(nil) } -// execBackup connects to a data node and downloads a snapshot. -func execBackup(args []string) { - // Parse command line arguments. - fs := flag.NewFlagSet("", flag.ExitOnError) - var ( - host = fs.String("host", "", "") - path = fs.String("output", "", "") - ) - fs.Usage = printBackupUsage - fs.Parse(args) - - Backup(*host, *path) -} - // execRestore restores a backup archive to the data directory and bootstraps the broker. func execRestore(args []string) { // Parse command line arguments. diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index e333e8933a8..16cceb8bdb5 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -123,6 +123,17 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B } log.Printf("data node #%d listening on %s", s.ID(), config.DataAddr()) + // Start snapshot handler. + go func() { + log.Fatal(http.ListenAndServe( + config.SnapshotAddr(), + &httpd.SnapshotHandler{ + CreateSnapshotWriter: s.CreateSnapshotWriter, + }, + )) + }() + log.Printf("snapshot endpoint listening on %s", config.SnapshotAddr()) + // Start the admin interface on the default port if config.Admin.Enabled { port := fmt.Sprintf(":%d", config.Admin.Port) diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index 8a1575b4003..c5b08d6c94c 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -1169,3 +1169,6 @@ func mustMarshalJSON(v interface{}) string { return string(b) } + +func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } +func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } diff --git a/httpd/handler.go b/httpd/handler.go index 0cc1e1549fd..42e9cec305d 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -1,7 +1,6 @@ package httpd import ( - "archive/tar" "encoding/json" "errors" "fmt" @@ -634,12 +633,12 @@ func recovery(inner http.Handler, name string, weblog *log.Logger) http.Handler }) } -// BackupHandler streams out the metastore and all shard databases as a tar archive. -type BackupHandler struct { - CreateSnapshotWriterFunc func(*influxdb.SnapshotWriter, error) +// SnapshotHandler streams out a snapshot from the server. +type SnapshotHandler struct { + CreateSnapshotWriter func() (*influxdb.SnapshotWriter, error) } -func (h *BackupHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Retrieve a snapshot from the server. sw, err := h.CreateSnapshotWriter() if err != nil { @@ -651,7 +650,6 @@ func (h *BackupHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { // TODO: Subtract existing snapshot from writer. // Write to response. - w.Header().Set("Content-Length", strconv.FormatInt(sw.Size(), 10)) if _, err := sw.WriteTo(w); err != nil { httpError(w, "error writing snapshot: "+err.Error(), false, http.StatusInternalServerError) return diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 421598f3f4a..1f5f911b2f9 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -1,10 +1,12 @@ package httpd_test import ( + "archive/tar" "bytes" "encoding/base64" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/http/httptest" @@ -1454,33 +1456,46 @@ func TestHandler_ProcessContinousQueries(t *testing.T) { } } -// Ensure the backup handler can write a snapshot as a tar archive over HTTP. -func TestBackupHandler(t *testing.T) { - // Mock the snapshot. - var snapshotter BackupHandlerSnapshotter - snapshotter.SnapshotFunc = func() (*influxdb.Snapshot, error) { - +// Ensure the snapshot handler can write a snapshot as a tar archive over HTTP. +func TestSnapshotHandler(t *testing.T) { + // Create handler and mock the snapshot creator. + var h httpd.SnapshotHandler + h.CreateSnapshotWriter = func() (*influxdb.SnapshotWriter, error) { + return &influxdb.SnapshotWriter{ + Snapshot: &influxdb.Snapshot{ + Files: []influxdb.SnapshotFile{{Name: "meta", Size: 5, Index: 12}}, + }, + FileWriters: map[string]influxdb.SnapshotFileWriter{ + "meta": influxdb.NopWriteToCloser(bytes.NewBufferString("55555")), + }, + }, nil } - // - s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - - })) - defer s.Close() + // Execute handler. + w := httptest.NewRecorder() + h.ServeHTTP(w, nil) - status, _ := MustHTTP("POST", s.URL+`/process_continuous_queries`, nil, nil, "") - if status != http.StatusAccepted { - t.Fatalf("unexpected status: %d", status) + // Verify snapshot was to response. + if w.Code != http.StatusOK { + t.Fatalf("unexpected status: %d", w.Code) + } else if w.Body == nil { + t.Fatal("body not written") } -} -// BackupHandlerSnapshotter is a mock type for the BackupHandler.Snapshotter interface. -type BackupHandlerSnapshotter struct { - SnapshotFunc func() (*influxdb.Snapshot, error) + // Read snapshot. + tr := tar.NewReader(w.Body) + if hdr, err := tr.Next(); err != nil { + t.Fatal(err) + } else if hdr.Name != "manifest" { + t.Fatalf("unexpected snapshot file: %s", hdr.Name) + } + if b, err := ioutil.ReadAll(tr); err != nil { + t.Fatal(err) + } else if string(b) != `{"files":[{"name":"meta","size":5,"index":12}]}` { + t.Fatalf("unexpected manifest: %s", b) + } } -func (s *BackupHandlerSnapshotter) Snapshot(*influxdb.Snapshot, error) { return s.SnapshotFunc() } - // batchWrite JSON Unmarshal tests // Utility functions for this test suite. diff --git a/snapshot.go b/snapshot.go index 7c84c574864..c602db7fe4c 100644 --- a/snapshot.go +++ b/snapshot.go @@ -7,6 +7,7 @@ import ( "io" "path" "path/filepath" + "time" "github.com/boltdb/bolt" ) @@ -16,6 +17,32 @@ type Snapshot struct { Files []SnapshotFile `json:"files"` } +// Diff returns a Snapshot of files that are newer in s than other. +func (s *Snapshot) Diff(other *Snapshot) *Snapshot { + diff := &Snapshot{} + + // Find versions of files that are newer in s. +loop: + for _, a := range s.Files { + // Try to find a newer version of the file in other. + // If found then don't append this file and move to the next file. + for _, b := range other.Files { + if a.Name != b.Name { + continue + } else if a.Index <= b.Index { + continue loop + } else { + break + } + } + + // Append the newest version. + diff.Files = append(diff.Files, a) + } + + return diff +} + // SnapshotFile represents a single file in a Snapshot. type SnapshotFile struct { Name string `json:"name"` // filename @@ -108,7 +135,12 @@ func (sw *SnapshotWriter) writeManifestTo(tw *tar.Writer) error { } // Write header & file. - if err := tw.WriteHeader(&tar.Header{Name: "manifest", Size: int64(len(b))}); err != nil { + if err := tw.WriteHeader(&tar.Header{ + Name: "manifest", + Size: int64(len(b)), + Mode: 0666, + ModTime: time.Now(), + }); err != nil { return fmt.Errorf("write header: %s", err) } if _, err := tw.Write(b); err != nil { @@ -128,8 +160,10 @@ func (sw *SnapshotWriter) writeFileTo(tw *tar.Writer, f *SnapshotFile) error { // Write file header. if err := tw.WriteHeader(&tar.Header{ - Name: f.Name, - Size: f.Size, + Name: f.Name, + Size: f.Size, + Mode: 0666, + ModTime: time.Now(), }); err != nil { return fmt.Errorf("write header: file=%s, err=%s", f.Name, err) } @@ -160,39 +194,22 @@ func createServerSnapshotWriter(s *Server) (*SnapshotWriter, error) { sw := NewSnapshotWriter() if err := func() error { - // Create snapshot file for metastore. - tx, err := s.meta.db.Begin(false) + f, fw, err := createMetaSnapshotFile(s.meta) if err != nil { - return fmt.Errorf("metastore begin: %s", err) + return fmt.Errorf("create meta snapshot file: %s", err) } - name := "meta" - sw.Snapshot.Files = append(sw.Snapshot.Files, SnapshotFile{ - Name: name, - Size: tx.Size(), - Index: (&metatx{tx}).index(), - }) - sw.FileWriters[name] = &boltTxCloser{tx} + sw.Snapshot.Files = append(sw.Snapshot.Files, *f) + sw.FileWriters[f.Name] = fw // Create files for each shard. for _, sh := range s.shards { - // Ignore shard if it's not owned by the server. - if sh.store == nil { - continue - } - - // Begin transaction and create snapshot file. - tx, err := sh.store.Begin(false) + f, fw, err := createShardSnapshotFile(sh) if err != nil { - return fmt.Errorf("shard begin: id=%d, err=%s", sh.ID, err) + return fmt.Errorf("create meta snapshot file: id=%d, err=%s", sh.ID, err) + } else if f != nil { + sw.Snapshot.Files = append(sw.Snapshot.Files, *f) + sw.FileWriters[f.Name] = fw } - - name := path.Join("shards", filepath.Base(sh.store.Path())) - sw.Snapshot.Files = append(sw.Snapshot.Files, SnapshotFile{ - Name: name, - Size: tx.Size(), - Index: shardMetaIndex(tx), - }) - sw.FileWriters[name] = &boltTxCloser{tx} } return nil @@ -204,6 +221,43 @@ func createServerSnapshotWriter(s *Server) (*SnapshotWriter, error) { return sw, nil } +func createMetaSnapshotFile(meta *metastore) (*SnapshotFile, SnapshotFileWriter, error) { + // Begin transaction. + tx, err := meta.db.Begin(false) + if err != nil { + return nil, nil, fmt.Errorf("begin: %s", err) + } + + // Create and return file and writer. + f := &SnapshotFile{ + Name: "meta", + Size: tx.Size(), + Index: (&metatx{tx}).index(), + } + return f, &boltTxCloser{tx}, nil +} + +func createShardSnapshotFile(sh *Shard) (*SnapshotFile, SnapshotFileWriter, error) { + // Ignore shard if it's not owned by the server. + if sh.store == nil { + return nil, nil, nil + } + + // Begin transaction. + tx, err := sh.store.Begin(false) + if err != nil { + return nil, nil, fmt.Errorf("begin: %s", err) + } + + // Create and return file and writer. + f := &SnapshotFile{ + Name: path.Join("shards", filepath.Base(sh.store.Path())), + Size: tx.Size(), + Index: shardMetaIndex(tx), + } + return f, &boltTxCloser{tx}, nil +} + // SnapshotFileWriter is the interface used for writing a file to a snapshot. type SnapshotFileWriter interface { io.WriterTo @@ -217,3 +271,17 @@ type boltTxCloser struct { // Close rollsback the transaction. func (tx *boltTxCloser) Close() error { return tx.Rollback() } + +// NopWriteToCloser returns an io.WriterTo that implements io.Closer. +func NopWriteToCloser(w io.WriterTo) interface { + io.WriterTo + io.Closer +} { + return &nopWriteToCloser{w} +} + +type nopWriteToCloser struct { + io.WriterTo +} + +func (w *nopWriteToCloser) Close() error { return nil } diff --git a/snapshot_test.go b/snapshot_test.go index b7149561471..58a45d1f722 100644 --- a/snapshot_test.go +++ b/snapshot_test.go @@ -5,11 +5,78 @@ import ( "bytes" "io" "io/ioutil" + "reflect" "testing" "github.com/influxdb/influxdb" ) +// Ensure a snapshot can be diff'd so that only newer files are retrieved. +func TestSnapshot_Diff(t *testing.T) { + for i, tt := range []struct { + s *influxdb.Snapshot + other *influxdb.Snapshot + result *influxdb.Snapshot + }{ + // 0. Mixed higher, lower, equal indices. + { + s: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 1}, // remove: lower index + {Name: "b", Index: 10}, // remove: equal index + {Name: "c", Index: 21}, // keep: higher index + {Name: "d", Index: 15}, // keep: higher index + }}, + other: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 2}, + {Name: "b", Index: 10}, + {Name: "c", Index: 11}, + {Name: "d", Index: 14}, + }}, + result: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "c", Index: 21}, + {Name: "d", Index: 15}, + }}, + }, + + // 1. Files in other-only should not be added to diff. + { + s: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 2}, + }}, + other: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 1}, + {Name: "b", Index: 10}, + }}, + result: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 2}, + }}, + }, + + // 2. Files in s-only should be added to diff. + { + s: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 2}, + }}, + other: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{}}, + result: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Index: 2}, + }}, + }, + + // 3. Empty snapshots should return empty diffs. + { + s: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{}}, + other: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{}}, + result: &influxdb.Snapshot{Files: nil}, + }, + } { + result := tt.s.Diff(tt.other) + if !reflect.DeepEqual(tt.result, result) { + t.Errorf("%d. mismatch:\n\nexp=%#v\n\ngot=%#v", i, tt.result, result) + } + } +} + // Ensure a snapshot writer can write a set of files to an archive func TestSnapshotWriter(t *testing.T) { // Create a new writer with a snapshot and file writers. From 11c808f55fccf594efc91552ce326f9ed5905c87 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Sun, 22 Mar 2015 14:58:40 -0600 Subject: [PATCH 3/6] Add restore and bootstrap. This commit adds the "influxd restore" command to the CLI. This allows a snapshot that has been produced by "influxd backup" to be restored to a config location and the broker and raft directories will be bootstrapped based on the state of the snapshot. --- cmd/influxd/backup.go | 18 +- cmd/influxd/backup_test.go | 8 +- cmd/influxd/config.go | 2 +- cmd/influxd/main.go | 33 ++-- cmd/influxd/restore.go | 251 ++++++++++++++++++++++++- cmd/influxd/restore_test.go | 170 +++++++++++++++++ cmd/influxd/run.go | 11 +- cmd/influxd/server_integration_test.go | 1 + httpd/handler_test.go | 1 - messaging/broker.go | 2 +- messaging/broker_test.go | 1 - raft/log.go | 15 +- server.go | 24 ++- shard.go | 13 ++ snapshot.go | 100 +++++++++- snapshot_test.go | 45 ++--- 16 files changed, 608 insertions(+), 87 deletions(-) create mode 100644 cmd/influxd/restore_test.go diff --git a/cmd/influxd/backup.go b/cmd/influxd/backup.go index ac161b42c91..fdd14210534 100644 --- a/cmd/influxd/backup.go +++ b/cmd/influxd/backup.go @@ -67,10 +67,7 @@ func (cmd *BackupCommand) Run(args ...string) error { // parseFlags parses and validates the command line arguments. func (cmd *BackupCommand) parseFlags(args []string) (url.URL, string, error) { fs := flag.NewFlagSet("", flag.ContinueOnError) - var ( - host = fs.String("host", DefaultSnapshotURL.String(), "") - path = fs.String("output", "", "") - ) + host := fs.String("host", DefaultSnapshotURL.String(), "") fs.SetOutput(cmd.Stderr) fs.Usage = cmd.printUsage if err := fs.Parse(args); err != nil { @@ -84,11 +81,12 @@ func (cmd *BackupCommand) parseFlags(args []string) (url.URL, string, error) { } // Require output path. - if *path == "" { - return url.URL{}, "", fmt.Errorf("output path required") + path := fs.Arg(0) + if path == "" { + return url.URL{}, "", fmt.Errorf("snapshot path required") } - return *u, *path, nil + return *u, path, nil } // download downloads a snapshot from a host to a given path. @@ -123,14 +121,12 @@ func (cmd *BackupCommand) download(u url.URL, path string) error { // printUsage prints the usage message to STDERR. func (cmd *BackupCommand) printUsage() { - fmt.Fprintf(cmd.Stderr, `usage: influxd backup [flags] + fmt.Fprintf(cmd.Stderr, `usage: influxd backup [flags] PATH backup downloads a snapshot of a data node and saves it to disk. -host The host to connect to snapshot. - - -output - Path to where the snapshot will be saved. + Defaults to 127.0.0.1:8086. `) } diff --git a/cmd/influxd/backup_test.go b/cmd/influxd/backup_test.go index cf75758ec64..0ce51838b45 100644 --- a/cmd/influxd/backup_test.go +++ b/cmd/influxd/backup_test.go @@ -35,7 +35,7 @@ func TestBackupCommand(t *testing.T) { // Execute the backup against the mock server. path := tempfile() defer os.Remove(path) - if err := NewBackupCommand().Run("-host", s.URL, "-output", path); err != nil { + if err := NewBackupCommand().Run("-host", s.URL, path); err != nil { t.Fatal(err) } @@ -64,7 +64,7 @@ func TestBackupCommand_ErrInvalidHostURL(t *testing.T) { // Ensure the backup command returns an error if the output path is not specified. func TestBackupCommand_ErrPathRequired(t *testing.T) { - if err := NewBackupCommand().Run("-host", "//localhost"); err == nil || err.Error() != `output path required` { + if err := NewBackupCommand().Run("-host", "//localhost"); err == nil || err.Error() != `snapshot path required` { t.Fatal(err) } } @@ -78,7 +78,7 @@ func TestBackupCommand_ErrConnectionRefused(t *testing.T) { // Execute the backup command. path := tempfile() defer os.Remove(path) - if err := NewBackupCommand().Run("-host", s.URL, "-output", path); err == nil || !strings.Contains(err.Error(), `connection refused`) { + if err := NewBackupCommand().Run("-host", s.URL, path); err == nil || !strings.Contains(err.Error(), `connection refused`) { t.Fatal(err) } } @@ -93,7 +93,7 @@ func TestBackupCommand_ErrServerError(t *testing.T) { // Execute the backup command. path := tempfile() defer os.Remove(path) - if err := NewBackupCommand().Run("-host", s.URL, "-output", path); err == nil || err.Error() != `download: snapshot error: status=500` { + if err := NewBackupCommand().Run("-host", s.URL, path); err == nil || err.Error() != `download: snapshot error: status=500` { t.Fatal(err) } } diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index 80160b1d345..ba36447642a 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -41,7 +41,7 @@ const ( DefaultSnapshotBindAddress = "127.0.0.1" // DefaultSnapshotPort is the default port to serve snapshots from. - DefaultSnapshotPort = 8086 + DefaultSnapshotPort = 8087 // DefaultJoinURLs represents the default URLs for joining a cluster. DefaultJoinURLs = "" diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index 39f52adab14..91f95a0f47d 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -4,11 +4,13 @@ import ( "flag" "fmt" "log" + "math/rand" "os" "os/signal" "runtime" "runtime/pprof" "strings" + "time" ) const logo = ` @@ -36,6 +38,7 @@ const ( func main() { log.SetFlags(0) + rand.Seed(time.Now().UnixNano()) // If commit not set, make that clear. if commit == "" { @@ -74,7 +77,10 @@ func main() { log.Fatalf("backup: %s", err) } case "restore": - execRestore(args[1:]) + cmd := NewRestoreCommand() + if err := cmd.Run(args[1:]...); err != nil { + log.Fatalf("restore: %s", err) + } case "version": execVersion(args[1:]) case "help": @@ -109,7 +115,13 @@ func execRun(args []string) { log.SetFlags(log.LstdFlags) writePIDFile(*pidPath) - config := parseConfig(*configPath, *hostname) + // Parse configuration file from disk. + config, err := parseConfig(*configPath, *hostname) + if err != nil { + log.Fatal(err) + } else if *configPath == "" { + log.Println("No config provided, using default settings") + } // Create a logging writer. logWriter := os.Stderr @@ -128,20 +140,6 @@ func execRun(args []string) { <-(chan struct{})(nil) } -// execRestore restores a backup archive to the data directory and bootstraps the broker. -func execRestore(args []string) { - // Parse command line arguments. - fs := flag.NewFlagSet("", flag.ExitOnError) - configPath := fs.String("config", "", "") - fs.Usage = printRestoreUsage - fs.Parse(args) - - // Path to the archive is the first argument. - path := fs.Arg(0) - - Restore(path, *configPath) -} - // execVersion runs the "version" command. // Prints the commit SHA1 if set by the build process. func execVersion(args []string) { @@ -233,3 +231,6 @@ func stopProfiling() { prof.mem.Close() } } + +func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) } +func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) } diff --git a/cmd/influxd/restore.go b/cmd/influxd/restore.go index 8eab746cc8b..1d49a279073 100644 --- a/cmd/influxd/restore.go +++ b/cmd/influxd/restore.go @@ -1,20 +1,257 @@ package main import ( + "encoding/binary" + "flag" + "fmt" + "io" + "io/ioutil" "log" + "math/rand" + "net/url" + "os" + "path/filepath" + "time" + + "github.com/boltdb/bolt" + "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/raft" ) -func Restore(path, configPath string) { - log.Printf("influxdb archive, version %s, commit %s", version, commit) - panic("not yet implemented") // TODO +// RestoreCommand represents the program execution for "influxd restore". +type RestoreCommand struct { + // The logger passed to the ticker during execution. + Logger *log.Logger + + // Standard input/output, overridden for testing. + Stderr io.Writer +} + +// NewRestoreCommand returns a new instance of RestoreCommand with default settings. +func NewRestoreCommand() *RestoreCommand { + return &RestoreCommand{ + Stderr: os.Stderr, + } +} + +// Run excutes the program. +func (cmd *RestoreCommand) Run(args ...string) error { + // Set up logger. + cmd.Logger = log.New(cmd.Stderr, "", log.LstdFlags) + cmd.Logger.Printf("influxdb restore, version %s, commit %s", version, commit) + + // Parse command line arguments. + config, path, err := cmd.parseFlags(args) + if err != nil { + return err + } + + // Remove broker & data directories. + if err := os.RemoveAll(config.BrokerDir()); err != nil { + return fmt.Errorf("remove broker dir: %s", err) + } else if err := os.RemoveAll(config.DataDir()); err != nil { + return fmt.Errorf("remove data dir: %s", err) + } + + // Open snapshot file. + f, err := os.Open(path) + if err != nil { + return fmt.Errorf("open: %s", err) + } + defer f.Close() + + // Create reader and extract manifest. + sr := influxdb.NewSnapshotReader(f) + ss, err := sr.Snapshot() + if err != nil { + return fmt.Errorf("snapshot: %s", err) + } + + // Unpack snapshot files into data directory. + if err := cmd.unpack(config.DataDir(), sr); err != nil { + return fmt.Errorf("unpack: %s", err) + } + + // Generate broker & raft directories from manifest. + if err := cmd.materialize(config.BrokerDir(), ss, config.BrokerURL()); err != nil { + return fmt.Errorf("materialize: %s", err) + } + + // Notify user of completion. + cmd.Logger.Println("restore complete") + + return nil +} + +// parseFlags parses and validates the command line arguments. +func (cmd *RestoreCommand) parseFlags(args []string) (*Config, string, error) { + fs := flag.NewFlagSet("", flag.ContinueOnError) + configPath := fs.String("config", "", "") + fs.SetOutput(cmd.Stderr) + fs.Usage = cmd.printUsage + if err := fs.Parse(args); err != nil { + return nil, "", err + } + + // Parse configuration file from disk. + config, err := parseConfig(*configPath, "") + if err != nil { + log.Fatal(err) + } else if *configPath == "" { + log.Println("No config provided, using default settings") + } + + // Require output path. + path := fs.Arg(0) + if path == "" { + return nil, "", fmt.Errorf("snapshot path required") + } + + return config, path, nil } -func printRestoreUsage() { - log.Printf(`usage: influxd restore [flags] PATH +// unpack expands the files in the snapshot archive into a directory. +func (cmd *RestoreCommand) unpack(path string, sr *influxdb.SnapshotReader) error { + // Create root directory. + if err := os.MkdirAll(path, 0777); err != nil { + return fmt.Errorf("mkdir: err=%s", err) + } + + // Loop over files and extract. + for { + // Read entry header. + sf, err := sr.Next() + if err == io.EOF { + break + } else if err != nil { + return fmt.Errorf("next: entry=%s, err=%s", sf.Name, err) + } + + // Create parent directory for output file. + if err := os.MkdirAll(filepath.Dir(filepath.Join(path, sf.Name)), 0777); err != nil { + return fmt.Errorf("mkdir: entry=%s, err=%s", sf.Name, err) + } -restore expands an archive into the data directory specified by the config. + if err := func() error { + // Create output file. + f, err := os.Create(filepath.Join(path, sf.Name)) + if err != nil { + return fmt.Errorf("create: entry=%s, err=%s", sf.Name, err) + } + defer f.Close() + + // Copy contents from reader. + if _, err := io.CopyN(f, sr, sf.Size); err != nil { + return fmt.Errorf("copy: entry=%s, err=%s", sf.Name, err) + } + + return nil + }(); err != nil { + return err + } + } + + return nil +} + +// materialize creates broker & raft directories based on the snapshot. +func (cmd *RestoreCommand) materialize(path string, ss *influxdb.Snapshot, u url.URL) error { + // Materialize broker. + if err := cmd.materializeBroker(path, ss.Index()); err != nil { + return fmt.Errorf("broker: %s", err) + } + + // Materialize raft. + if err := cmd.materializeRaft(filepath.Join(path, "raft"), u); err != nil { + return fmt.Errorf("raft: %s", err) + } + + return nil +} + +func (cmd *RestoreCommand) materializeBroker(path string, index uint64) error { + // Create root directory. + if err := os.MkdirAll(path, 0777); err != nil { + return fmt.Errorf("mkdir: err=%s", err) + } + + // Create broker meta store. + meta, err := bolt.Open(filepath.Join(path, "meta"), 0666, &bolt.Options{Timeout: 1 * time.Second}) + if err != nil { + return fmt.Errorf("open broker meta: %s", err) + } + defer meta.Close() + + // Write highest index to meta store. + if err := meta.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte("meta")) + if err != nil { + return fmt.Errorf("create meta bucket: %s", err) + } + + if err := b.Put([]byte("index"), u64tob(index)); err != nil { + return fmt.Errorf("put: %s", err) + } + return nil + }); err != nil { + return fmt.Errorf("update broker meta: %s", err) + } + + return nil +} + +func (cmd *RestoreCommand) materializeRaft(path string, u url.URL) error { + // Create raft directory. + if err := os.MkdirAll(path, 0777); err != nil { + return fmt.Errorf("mkdir raft: err=%s", err) + } + + // Write raft id & term. + if err := ioutil.WriteFile(filepath.Join(path, "id"), []byte(`1`), 0666); err != nil { + return fmt.Errorf("write raft/id: %s", err) + } + if err := ioutil.WriteFile(filepath.Join(path, "term"), []byte(`1`), 0666); err != nil { + return fmt.Errorf("write raft/term: %s", err) + } + + // Generate configuration. + var rc raft.Config + rc.ClusterID = uint64(rand.Int()) + rc.MaxNodeID = 1 + rc.AddNode(1, u) + + // Marshal config. + f, err := os.Create(filepath.Join(path, "config")) + if err != nil { + return fmt.Errorf("create config: %s", err) + } + defer f.Close() + + // Write config. + if err := raft.NewConfigEncoder(f).Encode(&rc); err != nil { + return fmt.Errorf("encode config: %s", err) + } + + return nil +} + +// printUsage prints the usage message to STDERR. +func (cmd *RestoreCommand) printUsage() { + fmt.Fprintf(cmd.Stderr, `usage: influxd restore [flags] PATH + +restore uses a snapshot of a data node to rebuild a cluster. -config - The path to the configuration file. + Set the path to the configuration file. `) } + +// u64tob converts a uint64 into an 8-byte slice. +func u64tob(v uint64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, v) + return b +} + +// btou64 converts an 8-byte slice into an uint64. +func btou64(b []byte) uint64 { return binary.BigEndian.Uint64(b) } diff --git a/cmd/influxd/restore_test.go b/cmd/influxd/restore_test.go new file mode 100644 index 00000000000..8b073dc4743 --- /dev/null +++ b/cmd/influxd/restore_test.go @@ -0,0 +1,170 @@ +package main_test + +import ( + "bytes" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "reflect" + "testing" + "time" + + "github.com/influxdb/influxdb" + "github.com/influxdb/influxdb/cmd/influxd" +) + +// Ensure the restore command can expand a snapshot and bootstrap a broker. +func TestRestoreCommand(t *testing.T) { + now := time.Now() + + // Create root path to server. + path := tempfile() + defer os.Remove(path) + + // Create a config template that can use different ports. + var configString = fmt.Sprintf(` + [broker] + port=%%d + dir=%q + + [data] + port=%%d + dir = %q + + [snapshot] + port=%%d + `, + filepath.Join(path, "broker"), + filepath.Join(path, "data"), + ) + + // Create configuration file. + configPath := tempfile() + defer os.Remove(configPath) + + // Parse configuration. + MustWriteFile(configPath, []byte(fmt.Sprintf(configString, 8900, 8900, 8901))) + c, err := main.ParseConfigFile(configPath) + if err != nil { + t.Fatalf("parse config: %s", err) + } + + // Start server. + b, s := main.Run(c, "", "x.x", os.Stderr) + if b == nil { + t.Fatal("cannot run broker") + } else if s == nil { + t.Fatal("cannot run server") + } + + // Create data. + if err := s.CreateDatabase("db"); err != nil { + t.Fatalf("cannot create database: %s", err) + } + if index, err := s.WriteSeries("db", "default", []influxdb.Point{{Name: "cpu", Timestamp: now, Fields: map[string]interface{}{"value": float64(100)}}}); err != nil { + t.Fatalf("cannot write series: %s", err) + } else if err = s.Sync(1, index); err != nil { + t.Fatalf("shard sync: %s", err) + } + + // Create snapshot writer. + sw, err := s.CreateSnapshotWriter() + if err != nil { + t.Fatalf("create snapshot writer: %s", err) + } + + // Snapshot to file. + sspath := tempfile() + f, err := os.Create(sspath) + if err != nil { + t.Fatal(err) + } + sw.WriteTo(f) + f.Close() + + // Stop server. + s.Close() + b.Close() + + // Remove data & broker directories. + if err := os.RemoveAll(path); err != nil { + t.Fatalf("remove: %s", err) + } + + // Rewrite config to a new port and re-parse. + MustWriteFile(configPath, []byte(fmt.Sprintf(configString, 8910, 8910, 8911))) + c, err = main.ParseConfigFile(configPath) + if err != nil { + t.Fatalf("parse config: %s", err) + } + + // Execute the restore. + if err := NewRestoreCommand().Run("-config", configPath, sspath); err != nil { + t.Fatal(err) + } + + // Restart server. + b, s = main.Run(c, "", "x.x", os.Stderr) + if b == nil { + t.Fatal("cannot run broker") + } else if s == nil { + t.Fatal("cannot run server") + } + + // Write new data. + if err := s.CreateDatabase("newdb"); err != nil { + t.Fatalf("cannot create new database: %s", err) + } + if index, err := s.WriteSeries("newdb", "default", []influxdb.Point{{Name: "mem", Timestamp: now, Fields: map[string]interface{}{"value": float64(1000)}}}); err != nil { + t.Fatalf("cannot write new series: %s", err) + } else if err = s.Sync(2, index); err != nil { + t.Fatalf("shard sync: %s", err) + } + + // Read series data. + if v, err := s.ReadSeries("db", "default", "cpu", nil, now); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(v, map[string]interface{}{"value": float64(100)}) { + t.Fatalf("read series(0) mismatch: %#v", v) + } + + // Read new series data. + if v, err := s.ReadSeries("newdb", "default", "mem", nil, now); err != nil { + t.Fatal(err) + } else if !reflect.DeepEqual(v, map[string]interface{}{"value": float64(1000)}) { + t.Fatalf("read series(1) mismatch: %#v", v) + } +} + +// RestoreCommand is a test wrapper for main.RestoreCommand. +type RestoreCommand struct { + *main.RestoreCommand + Stderr bytes.Buffer +} + +// NewRestoreCommand returns a new instance of RestoreCommand. +func NewRestoreCommand() *RestoreCommand { + cmd := &RestoreCommand{RestoreCommand: main.NewRestoreCommand()} + cmd.RestoreCommand.Stderr = &cmd.Stderr + return cmd +} + +// MustReadFile reads data from a file. Panic on error. +func MustReadFile(filename string) []byte { + b, err := ioutil.ReadFile(filename) + if err != nil { + panic(err.Error()) + } + return b +} + +// MustWriteFile writes data to a file. Panic on error. +func MustWriteFile(filename string, data []byte) { + if err := os.MkdirAll(filepath.Dir(filename), 0777); err != nil { + panic(err.Error()) + } + if err := ioutil.WriteFile(filename, data, 0666); err != nil { + panic(err.Error()) + } +} diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index 16cceb8bdb5..01a67447930 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -244,17 +244,16 @@ func writePIDFile(path string) { } } -// parses the configuration from a given path. Sets overrides as needed. -func parseConfig(path, hostname string) *Config { +// parseConfig parses the configuration from a given path. Sets overrides as needed. +func parseConfig(path, hostname string) (*Config, error) { if path == "" { - log.Println("No config provided, using default settings") - return NewConfig() + return NewConfig(), nil } // Parse configuration. config, err := ParseConfigFile(path) if err != nil { - log.Fatalf("config: %s", err) + return nil, fmt.Errorf("config: %s", err) } // Override config properties. @@ -262,7 +261,7 @@ func parseConfig(path, hostname string) *Config { config.Hostname = hostname } - return config + return config, nil } // creates and initializes a broker. diff --git a/cmd/influxd/server_integration_test.go b/cmd/influxd/server_integration_test.go index c5b08d6c94c..2bdd2eb545e 100644 --- a/cmd/influxd/server_integration_test.go +++ b/cmd/influxd/server_integration_test.go @@ -96,6 +96,7 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba c.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(basePort)) c.Broker.Port = basePort c.Data.Port = basePort + c.Snapshot.Port = basePort + 1 c.Admin.Enabled = false c.ReportingDisabled = true diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 1f5f911b2f9..230c841bb08 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -6,7 +6,6 @@ import ( "encoding/base64" "encoding/json" "fmt" - "io" "io/ioutil" "net/http" "net/http/httptest" diff --git a/messaging/broker.go b/messaging/broker.go index e26745c1a84..b3f45a0be6a 100644 --- a/messaging/broker.go +++ b/messaging/broker.go @@ -928,7 +928,7 @@ func ReadSegmentByIndex(path string, index uint64) (*Segment, error) { } else if index == 0 { return segments[0], nil } else if index < segments[0].Index { - return nil, ErrSegmentReclaimed + return segments[0], nil } // Find segment that contains index. diff --git a/messaging/broker_test.go b/messaging/broker_test.go index f524af1dbf5..d0ae7765e9c 100644 --- a/messaging/broker_test.go +++ b/messaging/broker_test.go @@ -407,7 +407,6 @@ func TestReadSegmentByIndex(t *testing.T) { {index: 19, segmentIndex: 12}, {index: 20, segmentIndex: 20}, {index: 21, segmentIndex: 20}, - {index: 5, segmentIndex: 6, err: messaging.ErrSegmentReclaimed}, } { segment, err := messaging.ReadSegmentByIndex(path, tt.index) if tt.err != nil { diff --git a/raft/log.go b/raft/log.go index e61cb343d53..f9f4c198d50 100644 --- a/raft/log.go +++ b/raft/log.go @@ -293,7 +293,7 @@ func (l *Log) Open(path string) error { c, err := l.readConfig() if err != nil { _ = l.close() - return err + return fmt.Errorf("read config: %s", err) } l.config = c @@ -439,18 +439,17 @@ func (l *Log) writeTerm(term uint64) error { func (l *Log) readConfig() (*Config, error) { // Read config from disk. f, err := os.Open(l.configPath()) - if err != nil && !os.IsNotExist(err) { + if os.IsNotExist(err) { + return nil, nil + } else if err != nil { return nil, err } defer func() { _ = f.Close() }() // Marshal file to a config type. - var config *Config - if f != nil { - config = &Config{} - if err := NewConfigDecoder(f).Decode(config); err != nil { - return nil, err - } + config := &Config{} + if err := NewConfigDecoder(f).Decode(config); err != nil { + return nil, err } return config, nil } diff --git a/server.go b/server.go index e8bd8c9d7d0..a2972d20d6c 100644 --- a/server.go +++ b/server.go @@ -506,14 +506,34 @@ func (s *Server) broadcast(typ messaging.MessageType, c interface{}) (uint64, er } // Wait for the server to receive the message. - err = s.Sync(index) + err = s.Sync(BroadcastTopicID, index) return index, err } // Sync blocks until a given index (or a higher index) has been applied. // Returns any error associated with the command. -func (s *Server) Sync(index uint64) error { +func (s *Server) Sync(topicID, index uint64) error { + // Sync to the broadcast topic if specified. + if topicID == 0 { + return s.syncBroadcast(index) + } + + // Otherwise retrieve shard by id. + s.mu.RLock() + sh := s.shards[topicID] + s.mu.RUnlock() + + // Return error if there is no shard. + if sh == nil || sh.store == nil { + return errors.New("shard not owned") + } + + return sh.sync(index) +} + +// syncBroadcast syncs the broadcast topic. +func (s *Server) syncBroadcast(index uint64) error { for { // Check if index has occurred. If so, retrieve the error and return. s.mu.RLock() diff --git a/shard.go b/shard.go index 0cb2eeb5c06..0f33c300b00 100644 --- a/shard.go +++ b/shard.go @@ -139,6 +139,19 @@ func (s *Shard) close() error { return nil } +// sync returns after a given index has been reached. +func (s *Shard) sync(index uint64) error { + for { + // Check if index has occurred. + if s.index >= index { + return nil + } + + // Otherwise wait momentarily and check again. + time.Sleep(1 * time.Millisecond) + } +} + // HasDataNodeID return true if the data node owns the shard. func (s *Shard) HasDataNodeID(id uint64) bool { for _, dataNodeID := range s.DataNodeIDs { diff --git a/snapshot.go b/snapshot.go index c602db7fe4c..3b5836ee447 100644 --- a/snapshot.go +++ b/snapshot.go @@ -12,11 +12,25 @@ import ( "github.com/boltdb/bolt" ) +// manifestName is the name of the manifest file in the snapshot. +const manifestName = "manifest" + // Snapshot represents the state of the Server at a given time. type Snapshot struct { Files []SnapshotFile `json:"files"` } +// Index returns the highest index across all files. +func (s *Snapshot) Index() uint64 { + var index uint64 + for _, f := range s.Files { + if f.Index > index { + index = f.Index + } + } + return index +} + // Diff returns a Snapshot of files that are newer in s than other. func (s *Snapshot) Diff(other *Snapshot) *Snapshot { diff := &Snapshot{} @@ -50,8 +64,90 @@ type SnapshotFile struct { Index uint64 `json:"index"` // highest index applied } -// SnapshotWriter writes a snapshot and the underlying files to disk as a tar archive. +// SnapshotReader reads a snapshot from a Reader. // This type is not safe for concurrent use. +type SnapshotReader struct { + tr *tar.Reader + snapshot *Snapshot +} + +// NewSnapshotReader returns a new SnapshotReader reading from r. +func NewSnapshotReader(r io.Reader) *SnapshotReader { + return &SnapshotReader{ + tr: tar.NewReader(r), + } +} + +// Snapshot returns the snapshot meta data. +func (sr *SnapshotReader) Snapshot() (*Snapshot, error) { + if err := sr.readSnapshot(); err != nil { + return nil, err + } + return sr.snapshot, nil +} + +// readSnapshot reads the first entry from the snapshot and materializes the snapshot. +// This is skipped if the snapshot manifest has already been read. +func (sr *SnapshotReader) readSnapshot() error { + // Already read, ignore. + if sr.snapshot != nil { + return nil + } + + // Read manifest header. + hdr, err := sr.tr.Next() + if err != nil { + return fmt.Errorf("snapshot header: %s", err) + } else if hdr.Name != manifestName { + return fmt.Errorf("invalid snapshot header: expected manifest") + } + + // Materialize snapshot. + var snapshot Snapshot + if err := json.NewDecoder(sr.tr).Decode(&snapshot); err != nil { + return fmt.Errorf("decode manifest: %s", err) + } + sr.snapshot = &snapshot + + return nil +} + +// Next returns the next file in the snapshot. +func (sr *SnapshotReader) Next() (SnapshotFile, error) { + // Read snapshot if it hasn't been read yet. + if err := sr.readSnapshot(); err != nil { + return SnapshotFile{}, err + } + + // Read next header. + hdr, err := sr.tr.Next() + if err != nil { + return SnapshotFile{}, err + } + + // Match header to file in snapshot. + for i := range sr.snapshot.Files { + if sr.snapshot.Files[i].Name == hdr.Name { + return sr.snapshot.Files[i], nil + } + } + + // Return error if file is not in the snapshot. + return SnapshotFile{}, fmt.Errorf("snapshot entry not found in manifest: %s", hdr.Name) +} + +// Read reads the current entry in the snapshot. +func (sr *SnapshotReader) Read(b []byte) (n int, err error) { + // Read snapshot if it hasn't been read yet. + if err := sr.readSnapshot(); err != nil { + return 0, err + } + + // Pass read through to the tar reader. + return sr.tr.Read(b) +} + +// SnapshotWriter writes a snapshot and the underlying files to disk as a tar archive. type SnapshotWriter struct { // The snapshot to write from. // Removing files from the snapshot after creation will cause those files to be ignored. @@ -136,7 +232,7 @@ func (sw *SnapshotWriter) writeManifestTo(tw *tar.Writer) error { // Write header & file. if err := tw.WriteHeader(&tar.Header{ - Name: "manifest", + Name: manifestName, Size: int64(len(b)), Mode: 0666, ModTime: time.Now(), diff --git a/snapshot_test.go b/snapshot_test.go index 58a45d1f722..fe3fd347f1e 100644 --- a/snapshot_test.go +++ b/snapshot_test.go @@ -1,7 +1,6 @@ package influxdb_test import ( - "archive/tar" "bytes" "io" "io/ioutil" @@ -106,44 +105,36 @@ func TestSnapshotWriter(t *testing.T) { t.Fatal(err) } - // Read archive from buffer. - tr := tar.NewReader(&buf) - - // First file should be the manifest. - if hdr, err := tr.Next(); err != nil { - t.Fatalf("unexpected error(manifest): %s", err) - } else if hdr.Name != "manifest" { - t.Fatalf("unexpected header name(manifest): %s", hdr.Name) - } else if hdr.Size != 87 { - t.Fatalf("unexpected header size(manifest): %d", hdr.Size) - } else if b := MustReadAll(tr); string(b) != `{"files":[{"name":"meta","size":3,"index":12},{"name":"shards/1","size":5,"index":15}]}` { - t.Fatalf("unexpected file(manifest): %s", b) + // Read snapshot from buffer. + sr := influxdb.NewSnapshotReader(&buf) + + // Read the manifest. + if ss, err := sr.Snapshot(); err != nil { + t.Fatalf("unexpected error(snapshot): %s", err) + } else if !reflect.DeepEqual(sw.Snapshot, ss) { + t.Fatalf("snapshot mismatch:\n\nexp=%#v\n\ngot=%#v", sw.Snapshot, ss) } // Next should be the meta file. - if hdr, err := tr.Next(); err != nil { + if f, err := sr.Next(); err != nil { t.Fatalf("unexpected error(meta): %s", err) - } else if hdr.Name != "meta" { - t.Fatalf("unexpected header name(meta): %s", hdr.Name) - } else if hdr.Size != 3 { - t.Fatalf("unexpected header size(meta): %d", hdr.Size) - } else if b := MustReadAll(tr); string(b) != `foo` { + } else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "meta", Size: 3, Index: 12}) { + t.Fatalf("file mismatch(meta): %#v", f) + } else if b := MustReadAll(sr); string(b) != `foo` { t.Fatalf("unexpected file(meta): %s", b) } // Next should be the shard file. - if hdr, err := tr.Next(); err != nil { + if f, err := sr.Next(); err != nil { t.Fatalf("unexpected error(shards/1): %s", err) - } else if hdr.Name != "shards/1" { - t.Fatalf("unexpected header name(shards/1): %s", hdr.Name) - } else if hdr.Size != 5 { - t.Fatalf("unexpected header size(shards/1): %d", hdr.Size) - } else if b := MustReadAll(tr); string(b) != `55555` { + } else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "shards/1", Size: 5, Index: 15}) { + t.Fatalf("file mismatch(shards/1): %#v", f) + } else if b := MustReadAll(sr); string(b) != `55555` { t.Fatalf("unexpected file(shards/1): %s", b) } - // Check for end of archive. - if _, err := tr.Next(); err != io.EOF { + // Check for end of snapshot. + if _, err := sr.Next(); err != io.EOF { t.Fatalf("expected EOF: %s", err) } } From 29cb550d95de31f0405f26b3ea257051423ad5b0 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Mon, 23 Mar 2015 16:06:29 -0600 Subject: [PATCH 4/6] Code review fixes. --- cmd/influxd/backup.go | 2 +- httpd/handler_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/influxd/backup.go b/cmd/influxd/backup.go index fdd14210534..b7fb27b9798 100644 --- a/cmd/influxd/backup.go +++ b/cmd/influxd/backup.go @@ -127,6 +127,6 @@ backup downloads a snapshot of a data node and saves it to disk. -host The host to connect to snapshot. - Defaults to 127.0.0.1:8086. + Defaults to 127.0.0.1:8087. `) } diff --git a/httpd/handler_test.go b/httpd/handler_test.go index caa7d853b1a..c9146439148 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -1639,7 +1639,7 @@ func TestSnapshotHandler(t *testing.T) { w := httptest.NewRecorder() h.ServeHTTP(w, nil) - // Verify snapshot was to response. + // Verify status code is successful and the snapshot was written. if w.Code != http.StatusOK { t.Fatalf("unexpected status: %d", w.Code) } else if w.Body == nil { From 2401e69f5861b4b34219238b6fafa6aa6309b67e Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 24 Mar 2015 15:57:03 -0600 Subject: [PATCH 5/6] Add incremental backups. This commit adds incremental backup support. Snapshotting from the server now creates a full backup if one does not exist and creates numbered incremental backups after that. For example, if you ran: $ influxd backup /tmp/snapshot Then you'll see a full snapshot in /tmp/snapshot. If you run the same command again then an incremental snapshot will be created at /tmp/snapshot.0. Running it again will create /tmp/snapshot.1. --- cmd/influxd/backup.go | 60 ++++++++- cmd/influxd/backup_test.go | 18 ++- cmd/influxd/restore.go | 25 ++-- httpd/handler.go | 10 +- httpd/handler_test.go | 17 ++- snapshot.go | 258 +++++++++++++++++++++++++++++++++++++ snapshot_test.go | 107 +++++++++++++++ 7 files changed, 471 insertions(+), 24 deletions(-) diff --git a/cmd/influxd/backup.go b/cmd/influxd/backup.go index b7fb27b9798..e49db9f1bbc 100644 --- a/cmd/influxd/backup.go +++ b/cmd/influxd/backup.go @@ -1,6 +1,8 @@ package main import ( + "bytes" + "encoding/json" "flag" "fmt" "io" @@ -8,6 +10,8 @@ import ( "net/http" "net/url" "os" + + "github.com/influxdb/influxdb" ) // BackupSuffix is a suffix added to the backup while it's in-process. @@ -41,13 +45,25 @@ func (cmd *BackupCommand) Run(args ...string) error { return err } - // TODO: Check highest index from local version. + // Retrieve snapshot from local file. + ss, err := influxdb.ReadFileSnapshot(path) + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("read file snapshot: %s", err) + } // Determine temporary path to download to. tmppath := path + BackupSuffix + // Calculate path of next backup file. + // This uses the path if it doesn't exist. + // Otherwise it appends an autoincrementing number. + path, err = cmd.nextPath(path) + if err != nil { + return fmt.Errorf("next path: %s", err) + } + // Retrieve snapshot. - if err := cmd.download(u, tmppath); err != nil { + if err := cmd.download(u, ss, tmppath); err != nil { return fmt.Errorf("download: %s", err) } @@ -89,8 +105,28 @@ func (cmd *BackupCommand) parseFlags(args []string) (url.URL, string, error) { return *u, path, nil } +// nextPath returns the next file to write to. +func (cmd *BackupCommand) nextPath(path string) (string, error) { + // Use base path if it doesn't exist. + if _, err := os.Stat(path); os.IsNotExist(err) { + return path, nil + } else if err != nil { + return "", err + } + + // Otherwise iterate through incremental files until one is available. + for i := 0; ; i++ { + s := fmt.Sprintf(path+".%d", i) + if _, err := os.Stat(s); os.IsNotExist(err) { + return s, nil + } else if err != nil { + return "", err + } + } +} + // download downloads a snapshot from a host to a given path. -func (cmd *BackupCommand) download(u url.URL, path string) error { +func (cmd *BackupCommand) download(u url.URL, ss *influxdb.Snapshot, path string) error { // Create local file to write to. f, err := os.Create(path) if err != nil { @@ -98,9 +134,23 @@ func (cmd *BackupCommand) download(u url.URL, path string) error { } defer f.Close() - // Fetch the archive from the server. + // Encode snapshot. + var buf bytes.Buffer + if ss != nil { + if err := json.NewEncoder(&buf).Encode(ss); err != nil { + return fmt.Errorf("encode snapshot: %s", err) + } + } + + // Create request with existing snapshot as the body. u.Path = "/snapshot" - resp, err := http.Get(u.String()) + req, err := http.NewRequest("GET", u.String(), &buf) + if err != nil { + return fmt.Errorf("new request: %s", err) + } + + // Fetch the archive from the server. + resp, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("get: %s", err) } diff --git a/cmd/influxd/backup_test.go b/cmd/influxd/backup_test.go index 0ce51838b45..10d5d3e989e 100644 --- a/cmd/influxd/backup_test.go +++ b/cmd/influxd/backup_test.go @@ -32,16 +32,26 @@ func TestBackupCommand(t *testing.T) { })) defer s.Close() - // Execute the backup against the mock server. + // Create a temp path and remove incremental backups at the end. path := tempfile() defer os.Remove(path) - if err := NewBackupCommand().Run("-host", s.URL, path); err != nil { - t.Fatal(err) + defer os.Remove(path) + defer os.Remove(path) + + // Execute the backup against the mock server. + for i := 0; i < 3; i++ { + if err := NewBackupCommand().Run("-host", s.URL, path); err != nil { + t.Fatal(err) + } } - // Verify snapshot was written to path. + // Verify snapshot and two incremental snapshots were written. if _, err := os.Stat(path); err != nil { t.Fatalf("snapshot not found: %s", err) + } else if _, err = os.Stat(path + ".0"); err != nil { + t.Fatalf("incremental snapshot(0) not found: %s", err) + } else if _, err = os.Stat(path + ".1"); err != nil { + t.Fatalf("incremental snapshot(1) not found: %s", err) } } diff --git a/cmd/influxd/restore.go b/cmd/influxd/restore.go index 1d49a279073..dc4c26ed8c4 100644 --- a/cmd/influxd/restore.go +++ b/cmd/influxd/restore.go @@ -53,22 +53,21 @@ func (cmd *RestoreCommand) Run(args ...string) error { return fmt.Errorf("remove data dir: %s", err) } - // Open snapshot file. - f, err := os.Open(path) + // Open snapshot file and all incremental backups. + ssr, files, err := influxdb.OpenFileSnapshotsReader(path) if err != nil { return fmt.Errorf("open: %s", err) } - defer f.Close() + defer closeAll(files) - // Create reader and extract manifest. - sr := influxdb.NewSnapshotReader(f) - ss, err := sr.Snapshot() + // Extract manifest. + ss, err := ssr.Snapshot() if err != nil { return fmt.Errorf("snapshot: %s", err) } // Unpack snapshot files into data directory. - if err := cmd.unpack(config.DataDir(), sr); err != nil { + if err := cmd.unpack(config.DataDir(), ssr); err != nil { return fmt.Errorf("unpack: %s", err) } @@ -110,8 +109,14 @@ func (cmd *RestoreCommand) parseFlags(args []string) (*Config, string, error) { return config, path, nil } +func closeAll(a []io.Closer) { + for _, c := range a { + _ = c.Close() + } +} + // unpack expands the files in the snapshot archive into a directory. -func (cmd *RestoreCommand) unpack(path string, sr *influxdb.SnapshotReader) error { +func (cmd *RestoreCommand) unpack(path string, ssr *influxdb.SnapshotsReader) error { // Create root directory. if err := os.MkdirAll(path, 0777); err != nil { return fmt.Errorf("mkdir: err=%s", err) @@ -120,7 +125,7 @@ func (cmd *RestoreCommand) unpack(path string, sr *influxdb.SnapshotReader) erro // Loop over files and extract. for { // Read entry header. - sf, err := sr.Next() + sf, err := ssr.Next() if err == io.EOF { break } else if err != nil { @@ -141,7 +146,7 @@ func (cmd *RestoreCommand) unpack(path string, sr *influxdb.SnapshotReader) erro defer f.Close() // Copy contents from reader. - if _, err := io.CopyN(f, sr, sf.Size); err != nil { + if _, err := io.CopyN(f, ssr, sf.Size); err != nil { return fmt.Errorf("copy: entry=%s, err=%s", sf.Name, err) } diff --git a/httpd/handler.go b/httpd/handler.go index b636e9c332c..ac75dcb11b4 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -778,6 +778,13 @@ type SnapshotHandler struct { } func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Read in previous snapshot from request body. + var prev influxdb.Snapshot + if err := json.NewDecoder(r.Body).Decode(&prev); err != nil && err != io.EOF { + httpError(w, "error reading previous snapshot: "+err.Error(), false, http.StatusBadRequest) + return + } + // Retrieve a snapshot from the server. sw, err := h.CreateSnapshotWriter() if err != nil { @@ -786,7 +793,8 @@ func (h *SnapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } defer sw.Close() - // TODO: Subtract existing snapshot from writer. + // Subtract existing snapshot from writer. + sw.Snapshot = sw.Snapshot.Diff(&prev) // Write to response. if _, err := sw.WriteTo(w); err != nil { diff --git a/httpd/handler_test.go b/httpd/handler_test.go index c9146439148..b016511ec96 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -1627,17 +1627,26 @@ func TestSnapshotHandler(t *testing.T) { h.CreateSnapshotWriter = func() (*influxdb.SnapshotWriter, error) { return &influxdb.SnapshotWriter{ Snapshot: &influxdb.Snapshot{ - Files: []influxdb.SnapshotFile{{Name: "meta", Size: 5, Index: 12}}, + Files: []influxdb.SnapshotFile{ + {Name: "meta", Size: 5, Index: 12}, + {Name: "shards/1", Size: 6, Index: 15}, + }, }, FileWriters: map[string]influxdb.SnapshotFileWriter{ - "meta": influxdb.NopWriteToCloser(bytes.NewBufferString("55555")), + "meta": influxdb.NopWriteToCloser(bytes.NewBufferString("55555")), + "shards/1": influxdb.NopWriteToCloser(bytes.NewBufferString("666666")), }, }, nil } - // Execute handler. + // Execute handler with an existing snapshot to diff. + // The "shards/1" has a higher index in the diff so it won't be included in the snapshot. w := httptest.NewRecorder() - h.ServeHTTP(w, nil) + r, _ := http.NewRequest( + "GET", "http://localhost/snapshot", + strings.NewReader(`{"files":[{"name":"meta","index":10},{"name":"shards/1","index":20}]}`), + ) + h.ServeHTTP(w, r) // Verify status code is successful and the snapshot was written. if w.Code != http.StatusOK { diff --git a/snapshot.go b/snapshot.go index 3b5836ee447..4ef85e510d9 100644 --- a/snapshot.go +++ b/snapshot.go @@ -5,8 +5,10 @@ import ( "encoding/json" "fmt" "io" + "os" "path" "path/filepath" + "sort" "time" "github.com/boltdb/bolt" @@ -54,9 +56,45 @@ loop: diff.Files = append(diff.Files, a) } + // Sort files. + sort.Sort(SnapshotFiles(diff.Files)) + return diff } +// Merge returns a Snapshot that combines s with other. +// Only the newest file between the two snapshots is returned. +func (s *Snapshot) Merge(other *Snapshot) *Snapshot { + ret := &Snapshot{} + ret.Files = make([]SnapshotFile, len(s.Files)) + copy(ret.Files, s.Files) + + // Update/insert versions of files that are newer in other. +loop: + for _, a := range other.Files { + for i, b := range ret.Files { + // Ignore if it doesn't match. + if a.Name != b.Name { + continue + } + + // Update if it's newer and then start the next file. + if a.Index > b.Index { + ret.Files[i] = a + } + continue loop + } + + // If the file wasn't found then append it. + ret.Files = append(ret.Files, a) + } + + // Sort files. + sort.Sort(SnapshotFiles(ret.Files)) + + return ret +} + // SnapshotFile represents a single file in a Snapshot. type SnapshotFile struct { Name string `json:"name"` // filename @@ -64,6 +102,13 @@ type SnapshotFile struct { Index uint64 `json:"index"` // highest index applied } +// SnapshotFiles represents a sortable list of snapshot files. +type SnapshotFiles []SnapshotFile + +func (p SnapshotFiles) Len() int { return len(p) } +func (p SnapshotFiles) Less(i, j int) bool { return p[i].Name < p[j].Name } +func (p SnapshotFiles) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + // SnapshotReader reads a snapshot from a Reader. // This type is not safe for concurrent use. type SnapshotReader struct { @@ -147,6 +192,215 @@ func (sr *SnapshotReader) Read(b []byte) (n int, err error) { return sr.tr.Read(b) } +// SnapshotsReader reads from a collection of snapshots. +// Only files with the highest index are read from the reader. +// This type is not safe for concurrent use. +type SnapshotsReader struct { + readers []*SnapshotReader // underlying snapshot readers + files []*SnapshotFile // current file for each reader + + snapshot *Snapshot // combined snapshot from all readers + index int // index of file in snapshot to read + curr *SnapshotReader // current reader +} + +// NewSnapshotsReader returns a new SnapshotsReader reading from a list of readers. +func NewSnapshotsReader(readers ...io.Reader) *SnapshotsReader { + r := &SnapshotsReader{ + readers: make([]*SnapshotReader, len(readers)), + files: make([]*SnapshotFile, len(readers)), + index: -1, + } + for i := range readers { + r.readers[i] = NewSnapshotReader(readers[i]) + } + return r +} + +// Snapshot returns the combined snapshot from all readers. +func (ssr *SnapshotsReader) Snapshot() (*Snapshot, error) { + // Use snapshot if it's already been calculated. + if ssr.snapshot != nil { + return ssr.snapshot, nil + } + + // Build snapshot from other readers. + ss := &Snapshot{} + for i, sr := range ssr.readers { + other, err := sr.Snapshot() + if err != nil { + return nil, fmt.Errorf("snapshot: idx=%d, err=%s", i, err) + } + ss = ss.Merge(other) + } + + // Cache snapshot and return. + ssr.snapshot = ss + return ss, nil +} + +// Next returns the next file in the reader. +func (ssr *SnapshotsReader) Next() (SnapshotFile, error) { + ss, err := ssr.Snapshot() + if err != nil { + return SnapshotFile{}, fmt.Errorf("snapshot: %s", err) + } + + // Return EOF if there are no more files in snapshot. + if ssr.index == len(ss.Files)-1 { + ssr.curr = nil + return SnapshotFile{}, io.EOF + } + + // Queue up next files. + if err := ssr.nextFiles(); err != nil { + return SnapshotFile{}, fmt.Errorf("next files: %s", err) + } + + // Increment the file index. + ssr.index++ + sf := ss.Files[ssr.index] + + // Find the matching reader. Clear other readers. + var sr *SnapshotReader + for i, f := range ssr.files { + if f == nil || f.Name != sf.Name { + continue + } + + // Set reader to the first match. + if sr == nil && *f == sf { + sr = ssr.readers[i] + } + ssr.files[i] = nil + } + + // Return an error if file doesn't match. + // This shouldn't happen unless the underlying snapshot is altered. + if sr == nil { + return SnapshotFile{}, fmt.Errorf("snaphot file not found in readers: %s", sf.Name) + } + + // Set current reader. + ssr.curr = sr + + // Return file. + return sf, nil +} + +// nextFiles queues up a next file for all readers. +func (ssr *SnapshotsReader) nextFiles() error { + for i, sr := range ssr.readers { + if ssr.files[i] == nil { + // Read next file. + sf, err := sr.Next() + if err == io.EOF { + ssr.files[i] = nil + continue + } else if err != nil { + return fmt.Errorf("next: reader=%d, err=%s", i, err) + } + + // Cache file. + ssr.files[i] = &sf + } + } + + return nil +} + +// nextIndex returns the index of the next reader to read from. +// Returns -1 if all readers are at EOF. +func (ssr *SnapshotsReader) nextIndex() int { + // Find the next file by name and lowest index. + index := -1 + for i, f := range ssr.files { + if f == nil { + continue + } else if index == -1 { + index = i + } else if f.Name < ssr.files[index].Name { + index = i + } else if f.Name == ssr.files[index].Name && f.Index > ssr.files[index].Index { + index = i + } + } + return index +} + +// Read reads the current entry in the reader. +func (ssr *SnapshotsReader) Read(b []byte) (n int, err error) { + if ssr.curr == nil { + return 0, io.EOF + } + return ssr.curr.Read(b) +} + +// OpenFileSnapshotsReader returns a SnapshotsReader based on the path of the base snapshot. +// Returns the underlying files which need to be closed separately. +func OpenFileSnapshotsReader(path string) (*SnapshotsReader, []io.Closer, error) { + var readers []io.Reader + var closers []io.Closer + if err := func() error { + // Open original snapshot file. + f, err := os.Open(path) + if os.IsNotExist(err) { + return err + } else if err != nil { + return fmt.Errorf("open snapshot: %s", err) + } + readers = append(readers, f) + closers = append(closers, f) + + // Open all incremental snapshots. + for i := 0; ; i++ { + filename := path + fmt.Sprintf(".%d", i) + f, err := os.Open(filename) + if os.IsNotExist(err) { + break + } else if err != nil { + return fmt.Errorf("open incremental snapshot: file=%s, err=%s", filename, err) + } + readers = append(readers, f) + closers = append(closers, f) + } + + return nil + }(); err != nil { + closeAll(closers) + return nil, nil, err + } + + return NewSnapshotsReader(readers...), nil, nil +} + +// ReadFileSnapshot returns a Snapshot for a given base snapshot path. +// This snapshot merges all incremental backup snapshots as well. +func ReadFileSnapshot(path string) (*Snapshot, error) { + // Open a multi-snapshot reader. + ssr, files, err := OpenFileSnapshotsReader(path) + if os.IsNotExist(err) { + return nil, err + } else if err != nil { + return nil, fmt.Errorf("open file snapshots reader: %s", err) + } + defer closeAll(files) + + // Read snapshot. + ss, err := ssr.Snapshot() + if err != nil { + return nil, fmt.Errorf("snapshot: %s", err) + } + + return ss, nil +} + +func closeAll(a []io.Closer) { + for _, c := range a { + _ = c.Close() + } +} + // SnapshotWriter writes a snapshot and the underlying files to disk as a tar archive. type SnapshotWriter struct { // The snapshot to write from. @@ -198,6 +452,10 @@ func (sw *SnapshotWriter) WriteTo(w io.Writer) (n int64, err error) { // Close any file writers that aren't required. sw.closeUnusedWriters() + // Sort snapshot files. + // This is required for combining multiple snapshots together. + sort.Sort(SnapshotFiles(sw.Snapshot.Files)) + // Begin writing a tar file to the output. tw := tar.NewWriter(w) defer tw.Close() diff --git a/snapshot_test.go b/snapshot_test.go index fe3fd347f1e..c2fa151e8e9 100644 --- a/snapshot_test.go +++ b/snapshot_test.go @@ -76,6 +76,44 @@ func TestSnapshot_Diff(t *testing.T) { } } +// Ensure a snapshot can be merged so that the newest files from the two snapshots are returned. +func TestSnapshot_Merge(t *testing.T) { + for i, tt := range []struct { + s *influxdb.Snapshot + other *influxdb.Snapshot + result *influxdb.Snapshot + }{ + // 0. Mixed higher, lower, equal indices. + { + s: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Size: 10, Index: 1}, + {Name: "b", Size: 10, Index: 10}, // keep: same, first + {Name: "c", Size: 10, Index: 21}, // keep: higher + {Name: "e", Size: 10, Index: 15}, // keep: higher + }}, + other: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Size: 20, Index: 2}, // keep: higher + {Name: "b", Size: 20, Index: 10}, + {Name: "c", Size: 20, Index: 11}, + {Name: "d", Size: 20, Index: 14}, // keep: new + {Name: "e", Size: 20, Index: 12}, + }}, + result: &influxdb.Snapshot{Files: []influxdb.SnapshotFile{ + {Name: "a", Size: 20, Index: 2}, + {Name: "b", Size: 10, Index: 10}, + {Name: "c", Size: 10, Index: 21}, + {Name: "d", Size: 20, Index: 14}, + {Name: "e", Size: 10, Index: 15}, + }}, + }, + } { + result := tt.s.Merge(tt.other) + if !reflect.DeepEqual(tt.result, result) { + t.Errorf("%d. mismatch:\n\nexp=%#v\n\ngot=%#v", i, tt.result, result) + } + } +} + // Ensure a snapshot writer can write a set of files to an archive func TestSnapshotWriter(t *testing.T) { // Create a new writer with a snapshot and file writers. @@ -163,6 +201,75 @@ func TestSnapshotWriter_CloseUnused(t *testing.T) { } } +// Ensure a SnapshotsReader can read from multiple snapshots. +func TestSnapshotsReader(t *testing.T) { + var sw *influxdb.SnapshotWriter + bufs := make([]bytes.Buffer, 2) + + // Snapshot #1 + sw = influxdb.NewSnapshotWriter() + sw.Snapshot.Files = []influxdb.SnapshotFile{ + {Name: "meta", Size: 3, Index: 12}, + {Name: "shards/1", Size: 5, Index: 15}, + } + sw.FileWriters["meta"] = &bufCloser{Buffer: *bytes.NewBufferString("foo")} + sw.FileWriters["shards/1"] = &bufCloser{Buffer: *bytes.NewBufferString("55555")} + if _, err := sw.WriteTo(&bufs[0]); err != nil { + t.Fatal(err) + } else if err = sw.Close(); err != nil { + t.Fatal(err) + } + + // Snapshot #2 + sw = influxdb.NewSnapshotWriter() + sw.Snapshot.Files = []influxdb.SnapshotFile{ + {Name: "meta", Size: 3, Index: 20}, + {Name: "shards/2", Size: 6, Index: 30}, + } + sw.FileWriters["meta"] = &bufCloser{Buffer: *bytes.NewBufferString("bar")} + sw.FileWriters["shards/2"] = &bufCloser{Buffer: *bytes.NewBufferString("666666")} + if _, err := sw.WriteTo(&bufs[1]); err != nil { + t.Fatal(err) + } else if err = sw.Close(); err != nil { + t.Fatal(err) + } + + // Read and merge snapshots. + ssr := influxdb.NewSnapshotsReader(&bufs[0], &bufs[1]) + + // Next should be the second meta file. + if f, err := ssr.Next(); err != nil { + t.Fatalf("unexpected error(meta): %s", err) + } else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "meta", Size: 3, Index: 20}) { + t.Fatalf("file mismatch(meta): %#v", f) + } else if b := MustReadAll(ssr); string(b) != `bar` { + t.Fatalf("unexpected file(meta): %s", b) + } + + // Next should be shards/1. + if f, err := ssr.Next(); err != nil { + t.Fatalf("unexpected error(shards/1): %s", err) + } else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "shards/1", Size: 5, Index: 15}) { + t.Fatalf("file mismatch(shards/1): %#v", f) + } else if b := MustReadAll(ssr); string(b) != `55555` { + t.Fatalf("unexpected file(shards/1): %s", b) + } + + // Next should be shards/2. + if f, err := ssr.Next(); err != nil { + t.Fatalf("unexpected error(shards/2): %s", err) + } else if !reflect.DeepEqual(f, influxdb.SnapshotFile{Name: "shards/2", Size: 6, Index: 30}) { + t.Fatalf("file mismatch(shards/2): %#v", f) + } else if b := MustReadAll(ssr); string(b) != `666666` { + t.Fatalf("unexpected file(shards/2): %s", b) + } + + // Check for end of snapshot. + if _, err := ssr.Next(); err != io.EOF { + t.Fatalf("expected EOF: %s", err) + } +} + // bufCloser adds a Close() method to a bytes.Buffer type bufCloser struct { bytes.Buffer From 4bc92c3018d5856efbf9d59d4fa3704eadab5e34 Mon Sep 17 00:00:00 2001 From: Ben Johnson Date: Tue, 24 Mar 2015 16:04:39 -0600 Subject: [PATCH 6/6] Code review fixes. --- cmd/influxd/backup.go | 2 +- cmd/influxd/backup_test.go | 2 +- cmd/influxd/restore.go | 5 ++++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cmd/influxd/backup.go b/cmd/influxd/backup.go index e49db9f1bbc..b8de924152d 100644 --- a/cmd/influxd/backup.go +++ b/cmd/influxd/backup.go @@ -177,6 +177,6 @@ backup downloads a snapshot of a data node and saves it to disk. -host The host to connect to snapshot. - Defaults to 127.0.0.1:8087. + Defaults to http://127.0.0.1:8087. `) } diff --git a/cmd/influxd/backup_test.go b/cmd/influxd/backup_test.go index 10d5d3e989e..e6a946b3d14 100644 --- a/cmd/influxd/backup_test.go +++ b/cmd/influxd/backup_test.go @@ -81,7 +81,7 @@ func TestBackupCommand_ErrPathRequired(t *testing.T) { // Ensure the backup returns an error if it cannot connect to the server. func TestBackupCommand_ErrConnectionRefused(t *testing.T) { - // Start and immediate stop a server so we have a dead port. + // Start and immediately stop a server so we have a dead port. s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})) s.Close() diff --git a/cmd/influxd/restore.go b/cmd/influxd/restore.go index dc4c26ed8c4..b6892f04569 100644 --- a/cmd/influxd/restore.go +++ b/cmd/influxd/restore.go @@ -77,7 +77,7 @@ func (cmd *RestoreCommand) Run(args ...string) error { } // Notify user of completion. - cmd.Logger.Println("restore complete") + cmd.Logger.Printf("restore complete using %s", path) return nil } @@ -132,6 +132,9 @@ func (cmd *RestoreCommand) unpack(path string, ssr *influxdb.SnapshotsReader) er return fmt.Errorf("next: entry=%s, err=%s", sf.Name, err) } + // Log progress. + cmd.Logger.Printf("unpacking: %s / idx=%d (%d bytes)", sf.Name, sf.Index, sf.Size) + // Create parent directory for output file. if err := os.MkdirAll(filepath.Dir(filepath.Join(path, sf.Name)), 0777); err != nil { return fmt.Errorf("mkdir: entry=%s, err=%s", sf.Name, err)