diff --git a/CHANGELOG.md b/CHANGELOG.md index a55551d943c..0e39ed5d1c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,9 @@ - [#1935](https://github.com/influxdb/influxdb/pull/1935): Implement stateless broker for Raft. - [#1936](https://github.com/influxdb/influxdb/pull/1936): Implement "SHOW STATS" and self-monitoring +### Features +- [#1909](https://github.com/influxdb/influxdb/pull/1909): Implement a dump command. + ## v0.9.0-rc11 [2015-03-13] ### Bugfixes diff --git a/client/influxdb.go b/client/influxdb.go index 6272cdc3680..e46a83f072f 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" "net/url" "time" @@ -145,6 +146,30 @@ func (c *Client) Ping() (time.Duration, string, error) { return time.Since(now), version, nil } +func (c *Client) Dump(db string) (io.ReadCloser, error) { + u := c.url + u.Path = "dump" + values := u.Query() + values.Set("db", db) + values.Set("user", c.username) + values.Set("password", c.password) + u.RawQuery = values.Encode() + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, err + } + req.Header.Set("User-Agent", c.userAgent) + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + return resp.Body, fmt.Errorf("HTTP Protocol error %d", resp.StatusCode) + } + return resp.Body, nil +} + // Structs // Result represents a resultset returned from a single statement. diff --git a/cmd/influx/main.go b/cmd/influx/main.go index 31673e162cb..4fb180dce66 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -31,16 +31,17 @@ const ( ) type CommandLine struct { - Client *client.Client - Line *liner.State - Host string - Port int - Username string - Password string - Database string - Version string - Pretty bool // controls pretty print for json - Format string // controls the output format. Valid values are json, csv, or column + Client *client.Client + Line *liner.State + Host string + Port int + Username string + Password string + Database string + Version string + Pretty bool // controls pretty print for json + Format string // controls the output format. Valid values are json, csv, or column + ShouldDump bool } func main() { @@ -53,6 +54,7 @@ func main() { fs.StringVar(&c.Password, "password", c.Password, `password to connect to the server. Leaving blank will prompt for password (--password="")`) fs.StringVar(&c.Database, "database", c.Database, "database to connect to the server.") fs.StringVar(&c.Format, "output", default_format, "format specifies the format of the server responses: json, csv, or column") + fs.BoolVar(&c.ShouldDump, "dump", false, "dump the contents of the given database to stdout") fs.Parse(os.Args[1:]) var promptForPassword bool @@ -65,9 +67,6 @@ func main() { } } - // TODO Determine if we are an ineractive shell or running commands - fmt.Println("InfluxDB shell " + version) - c.Line = liner.NewLiner() defer c.Line.Close() @@ -82,6 +81,13 @@ func main() { c.connect("") + if c.ShouldDump { + c.dump() + return + } + + fmt.Println("InfluxDB shell " + version) + var historyFile string usr, err := user.Current() // Only load history if we can get the user @@ -205,7 +211,9 @@ func (c *CommandLine) connect(cmd string) { fmt.Printf("Failed to connect to %s\n", c.Client.Addr()) } else { c.Version = v - fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) + if !c.ShouldDump { + fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) + } } } @@ -249,6 +257,19 @@ func (c *CommandLine) SetFormat(cmd string) { } } +func (c *CommandLine) dump() { + response, err := c.Client.Dump(c.Database) + defer response.Close() + if err != nil { + fmt.Printf("Dump failed. %s\n", err) + } else { + _, err := io.Copy(os.Stdout, response) + if err != nil { + fmt.Printf("Dump failed. %s\n", err) + } + } +} + func (c *CommandLine) executeQuery(query string) { results, err := c.Client.Query(client.Query{Command: query, Database: c.Database}) if err != nil { diff --git a/httpd/handler.go b/httpd/handler.go index 3a0c4b86d0e..2cba2280598 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -1,6 +1,7 @@ package httpd import ( + "compress/gzip" "encoding/json" "errors" "fmt" @@ -16,8 +17,6 @@ import ( "sync/atomic" "time" - "compress/gzip" - "code.google.com/p/go-uuid/uuid" "github.com/bmizerany/pat" @@ -113,6 +112,10 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) "index", // Index. "GET", "/", true, true, h.serveIndex, }, + route{ + "dump", // export all points in the given db. + "GET", "/dump", true, true, h.serveDump, + }, ) for _, r := range h.routes { @@ -183,6 +186,132 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ httpResults(w, results, pretty) } +func interfaceToString(v interface{}) string { + switch t := v.(type) { + case nil: + return "" + case bool: + return fmt.Sprintf("%v", v) + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, uintptr: + return fmt.Sprintf("%d", t) + case float32, float64: + return fmt.Sprintf("%v", t) + default: + return fmt.Sprintf("%v", t) + } +} + +type Point struct { + Name string `json:"name"` + Timestamp time.Time `json:"timestamp"` + Tags map[string]string `json:"tags"` + Fields map[string]interface{} `json:"fields"` +} + +type Batch struct { + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Points []Point `json:"points"` +} + +// Return all the measurements from the given DB +func (h *Handler) showMeasurements(db string, user *influxdb.User) ([]string, error) { + var measurements []string + results := h.server.ExecuteQuery(&influxql.Query{[]influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, user) + if results.Err != nil { + return measurements, results.Err + } + + for _, result := range results.Results { + for _, row := range result.Series { + for _, tuple := range (*row).Values { + for _, cell := range tuple { + measurements = append(measurements, interfaceToString(cell)) + } + } + } + } + return measurements, nil +} + +// serveDump returns all points in the given database as a plaintext list of JSON structs. +// To get all points: +// Find all measurements (show measurements). +// For each measurement do select * from group by * +func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influxdb.User) { + q := r.URL.Query() + db := q.Get("db") + pretty := q.Get("pretty") == "true" + delim := []byte("\n") + measurements, err := h.showMeasurements(db, user) + if err != nil { + httpError(w, "error with dump: "+err.Error(), pretty, http.StatusInternalServerError) + return + } + + // Fetch all the points for each measurement. + // From the 'select' query below, we get: + // + // columns:[col1, col2, col3, ...] + // - and - + // values:[[val1, val2, val3, ...], [val1, val2, val3, ...], [val1, val2, val3, ...]...] + // + // We need to turn that into multiple rows like so... + // fields:{col1 : values[0][0], col2 : values[0][1], col3 : values[0][2]} + // fields:{col1 : values[1][0], col2 : values[1][1], col3 : values[1][2]} + // fields:{col1 : values[2][0], col2 : values[2][1], col3 : values[2][2]} + // + for _, measurement := range measurements { + queryString := fmt.Sprintf("select * from %s group by *", measurement) + p := influxql.NewParser(strings.NewReader(queryString)) + query, err := p.ParseQuery() + if err != nil { + httpError(w, "error with dump: "+err.Error(), pretty, http.StatusInternalServerError) + return + } + // + results := h.server.ExecuteQuery(query, db, user) + for _, result := range results.Results { + for _, row := range result.Series { + points := make([]Point, 1) + var point Point + point.Name = row.Name + point.Tags = row.Tags + point.Fields = make(map[string]interface{}) + for _, tuple := range row.Values { + for subscript, cell := range tuple { + if row.Columns[subscript] == "time" { + point.Timestamp, _ = cell.(time.Time) + continue + } + point.Fields[row.Columns[subscript]] = cell + } + points[0] = point + batch := &Batch{ + Points: points, + Database: db, + RetentionPolicy: "default", + } + buf, err := json.Marshal(&batch) + + // TODO: Make this more legit in the future + // Since we're streaming data as chunked responses, this error could + // be in the middle of an already-started data stream. Until Go 1.5, + // we can't really support proper trailer headers, so we'll just + // wait until then: https://code.google.com/p/go/issues/detail?id=7759 + if err != nil { + w.Write([]byte("*** SERVER-SIDE ERROR. MISSING DATA ***")) + w.Write(delim) + return + } + w.Write(buf) + w.Write(delim) + } + } + } + } +} + // serveWrite receives incoming series data and writes it to the database. func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) { var bp client.BatchPoints diff --git a/httpd/handler_test.go b/httpd/handler_test.go index ec1b5419e6a..1f5b987e993 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -278,8 +278,8 @@ func TestHandler_RetentionPolicies(t *testing.T) { if status != http.StatusOK { t.Fatalf("unexpected status: %d", status) - } else if body != `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["bar","168h0m0s",1,false]]}]}]}` { - t.Fatalf("unexpected body: %s", body) + } else if !strings.Contains(body, `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true],["bar","168h0m0s",1,false]]}]}]}`) { + t.Fatalf("Missing retention policy: %s", body) } } @@ -1118,16 +1118,54 @@ func TestHandler_DropSeries(t *testing.T) { func TestHandler_serveWriteSeries(t *testing.T) { c := test.NewMessagingClient() defer c.Close() - srvr := OpenAuthenticatedServer(c) + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") - srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) s := NewHTTPServer(srvr) defer s.Close() - status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","fields": {"value": 100}}]}`) + status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "default", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","fields": {"value": 100}}]}`) if status != http.StatusOK { - t.Fatalf("unexpected status: %d", status) + t.Fatalf("unexpected status for post: %d", status) + } + query := map[string]string{"db": "foo", "q": "select * from cpu"} + status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "") + if status != http.StatusOK { + t.Fatalf("unexpected status for get: %d", status) + } + if !strings.Contains(body, `"name":"cpu"`) { + t.Fatalf("Write doesn't match query results. Response body is %s.", body) + } +} + +func TestHandler_serveDump(t *testing.T) { + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) + srvr.CreateDatabase("foo") + s := NewHTTPServer(srvr) + defer s.Close() + + status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "default", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","fields": {"value": 100}}]}`) + + if status != http.StatusOK { + t.Fatalf("unexpected status for post: %d", status) + + } + query := map[string]string{"db": "foo", "q": "select * from cpu"} + status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "") + if status != http.StatusOK { + t.Fatalf("unexpected status for get: %d", status) + } + + time.Sleep(500 * time.Millisecond) // Shouldn't committed data be readable? + query = map[string]string{"db": "foo"} + status, body = MustHTTP("GET", s.URL+`/dump`, query, nil, "") + if status != http.StatusOK { + t.Fatalf("unexpected status for get: %d", status) + } + if !strings.Contains(body, `"name":"cpu"`) { + t.Fatalf("Write doesn't match query results. Response body is %s.", body) } } @@ -1501,6 +1539,7 @@ func MustHTTP(verb, path string, params, headers map[string]string, body string) b, err := ioutil.ReadAll(resp.Body) return resp.StatusCode, strings.TrimRight(string(b), "\n") + } // MustParseURL parses a string into a URL. Panic on error. @@ -1539,7 +1578,9 @@ type Server struct { // NewServer returns a new test server instance. func NewServer() *Server { - return &Server{influxdb.NewServer()} + s := &Server{influxdb.NewServer()} + s.RetentionAutoCreate = true + return s } // OpenAuthenticatedServer returns a new, open test server instance with authentication enabled.