From 3193e2129b1f2362d9065d55bef697f68b5c21a6 Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Tue, 10 Mar 2015 16:30:43 -0700 Subject: [PATCH 01/13] make a dump command --- cmd/influx/main.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/cmd/influx/main.go b/cmd/influx/main.go index 3afdb5ccd1e..b69b899d8ed 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -140,6 +140,8 @@ func (c *CommandLine) ParseCommand(cmd string) bool { } case strings.HasPrefix(lcmd, "use"): c.use(cmd) + case strings.HasPrefix(lcmd, "dump"): + c.dump() case lcmd == "": break default: @@ -248,6 +250,31 @@ func (c *CommandLine) SetFormat(cmd string) { } } +func (c *CommandLine) dump() { + query := "show measurements" + results, err := c.Client.Query(client.Query{Command: query, Database: c.Database}) + if err != nil { + fmt.Printf("ERR: %s\n", err) + return + } + measurements := fetchRows(results) + for _, i := range measurements { + query = fmt.Sprintf("select * from %s group by *", i[2]) + results, err = c.Client.Query(client.Query{Command: query, Database: c.Database}) + if err != nil { + fmt.Printf("ERR: %s\n", err) + return + } + selectStar := fetchRows(results) + for _, j := range selectStar { + for _, k := range j { + fmt.Printf("%s ", k) + } + fmt.Println() + } + } +} + func (c *CommandLine) executeQuery(query string) { results, err := c.Client.Query(client.Query{Command: query, Database: c.Database}) if err != nil { @@ -317,6 +344,31 @@ func WriteColumns(results *client.Results, w io.Writer) { } } +func fetchRows(results *client.Results) [][]string { + var rows [][]string + for _, result := range results.Results { + for _, row := range result.Series { + // gather tags + tags := []string{} + for k, v := range row.Tags { + tags = append(tags, fmt.Sprintf("%s=%s", k, v)) + } + thisRow := []string{} + for _, v := range row.Values { + thisRow = append(thisRow, row.Name) + thisRow = append(thisRow, strings.Join(tags, ",")) + + for _, vv := range v { + thisRow = append(thisRow, interfaceToString(vv)) + } + rows = append(rows, thisRow) + thisRow = []string{} + } + } + } + return rows +} + func resultToCSV(result client.Result, seperator string, headerLines bool) []string { rows := []string{} // Create a tabbed writer for each result a they won't always line up From 12760b9023c2b7e314d60835e8fa17664d6ff951 Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Wed, 11 Mar 2015 16:54:17 -0700 Subject: [PATCH 02/13] turn a 'Row' into distinct points --- cmd/influx/main.go | 101 ++++++++++++++++++++++++++++++--------------- 1 file changed, 68 insertions(+), 33 deletions(-) diff --git a/cmd/influx/main.go b/cmd/influx/main.go index b69b899d8ed..f9b191743c1 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -13,8 +13,10 @@ import ( "strconv" "strings" "text/tabwriter" + "time" "github.com/influxdb/influxdb/client" + // "github.com/influxdb/influxdb/influxql" "github.com/peterh/liner" ) @@ -250,27 +252,85 @@ func (c *CommandLine) SetFormat(cmd string) { } } +type Point struct { + Name string `json:"name"` + Timestamp time.Time `json:"timestamp"` + Tags map[string]string `json:"tags"` + Fields map[string]interface{} `json:"fields"` +} + +type Batch struct { + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Points []Point `json:"points"` +} + +// Output each point in the database in JSON. +// To get all points: +// Find all measurements (show measurements). +// For each measurement do select * from group by * func (c *CommandLine) dump() { + // Make our list of measurements + var measurements []string query := "show measurements" results, err := c.Client.Query(client.Query{Command: query, Database: c.Database}) if err != nil { fmt.Printf("ERR: %s\n", err) return } - measurements := fetchRows(results) - for _, i := range measurements { - query = fmt.Sprintf("select * from %s group by *", i[2]) + for _, measurementResult := range results.Results { + for _, measurementRow := range measurementResult.Series { + for _, measurementTuple := range measurementRow.Values { + for _, measurementCell := range measurementTuple { + measurements = append(measurements, interfaceToString(measurementCell)) + } + } + } + } + // Fetch all the points for each measurement found above. + // From the 'select' query below, we get: + // + // columns:[col1, col2, col3, ...] + // - and - + // values:[[val1, val2, val3, ...], [val1, val2, val3, ...], [val1, val2, val3, ...]...] + // + // We need to turn that into multiple rows like so... + // fields:{col1 : values[0][0], col2 : values[0][1], col3 : values[0][2]} + // fields:{col1 : values[1][0], col2 : values[1][1], col3 : values[1][2]} + // fields:{col1 : values[2][0], col2 : values[2][1], col3 : values[2][2]} + // + for _, measurement := range measurements { + query = fmt.Sprintf("select * from %s group by *", measurement) results, err = c.Client.Query(client.Query{Command: query, Database: c.Database}) if err != nil { fmt.Printf("ERR: %s\n", err) return } - selectStar := fetchRows(results) - for _, j := range selectStar { - for _, k := range j { - fmt.Printf("%s ", k) + for _, result := range results.Results { + for _, row := range result.Series { + points := make([]Point, 1) + var point Point + point.Name = row.Name + point.Tags = row.Tags + point.Fields = make(map[string]interface{}) + for _, tuple := range row.Values { + for subscript, cell := range tuple { + if row.Columns[subscript] == "time" { + point.Timestamp, _ = time.Parse(time.RFC3339, interfaceToString(cell)) + continue + } + point.Fields[row.Columns[subscript]] = cell + } + points[0] = point + batch := &Batch{ + Database: "db", + RetentionPolicy: "raw", + Points: points, + } + buf, _ := json.Marshal(&batch) + fmt.Printf("%s\n", buf) + } } - fmt.Println() } } } @@ -344,31 +404,6 @@ func WriteColumns(results *client.Results, w io.Writer) { } } -func fetchRows(results *client.Results) [][]string { - var rows [][]string - for _, result := range results.Results { - for _, row := range result.Series { - // gather tags - tags := []string{} - for k, v := range row.Tags { - tags = append(tags, fmt.Sprintf("%s=%s", k, v)) - } - thisRow := []string{} - for _, v := range row.Values { - thisRow = append(thisRow, row.Name) - thisRow = append(thisRow, strings.Join(tags, ",")) - - for _, vv := range v { - thisRow = append(thisRow, interfaceToString(vv)) - } - rows = append(rows, thisRow) - thisRow = []string{} - } - } - } - return rows -} - func resultToCSV(result client.Result, seperator string, headerLines bool) []string { rows := []string{} // Create a tabbed writer for each result a they won't always line up From 6f86cac5d857a3c7cdc9d30c1af3bb8b10fa2955 Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Thu, 12 Mar 2015 17:14:53 -0700 Subject: [PATCH 03/13] move to http handler --- httpd/handler.go | 115 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) diff --git a/httpd/handler.go b/httpd/handler.go index ffc31bbe9f5..56454ac9f60 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -1,6 +1,7 @@ package httpd import ( + "bytes" "encoding/json" "errors" "fmt" @@ -113,6 +114,10 @@ func NewHandler(s *influxdb.Server, requireAuthentication bool, version string) "index", // Index. "GET", "/", true, true, h.serveIndex, }, + route{ + "dump", // export all points in the given db. + "GET", "/dump", true, true, h.serveDump, + }, ) for _, r := range h.routes { @@ -175,6 +180,116 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ httpResults(w, results, pretty) } +type Point struct { + Name string `json:"name"` + Timestamp time.Time `json:"timestamp"` + Tags map[string]string `json:"tags"` + Fields map[string]interface{} `json:"fields"` +} + +type Batch struct { + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Points []Point `json:"points"` +} + +func interfaceToString(v interface{}) string { + switch t := v.(type) { + case nil: + return "" + case bool: + return fmt.Sprintf("%v", v) + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, uintptr: + return fmt.Sprintf("%d", t) + case float32, float64: + return fmt.Sprintf("%v", t) + default: + return fmt.Sprintf("%v", t) + } +} + +// serveDump returns all points in the given database as a plaintext list of JSON structs. +// To get all points: +// Find all measurements (show measurements). +// For each measurement do select * from group by * +func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influxdb.User) { + q := r.URL.Query() + db := q.Get("db") + pretty := q.Get("pretty") == "true" + var httpOut bytes.Buffer + + // Make our list of measurements + var measurements []string + queryString := "show measurements" + p := influxql.NewParser(strings.NewReader(queryString)) + query, err := p.ParseQuery() + if err != nil { + httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest) + return + } + results := h.server.ExecuteQuery(query, db, user) + for _, measurementResult := range results.Results { + for _, measurementRow := range measurementResult.Series { + for _, measurementTuple := range (*measurementRow).Values { + for _, measurementCell := range measurementTuple { + measurements = append(measurements, interfaceToString(measurementCell)) + } + } + } + } + // Fetch all the points for each measurement found above. + // From the 'select' query below, we get: + // + // columns:[col1, col2, col3, ...] + // - and - + // values:[[val1, val2, val3, ...], [val1, val2, val3, ...], [val1, val2, val3, ...]...] + // + // We need to turn that into multiple rows like so... + // fields:{col1 : values[0][0], col2 : values[0][1], col3 : values[0][2]} + // fields:{col1 : values[1][0], col2 : values[1][1], col3 : values[1][2]} + // fields:{col1 : values[2][0], col2 : values[2][1], col3 : values[2][2]} + // + for _, measurement := range measurements { + queryString = fmt.Sprintf("select * from %s group by *", measurement) + p := influxql.NewParser(strings.NewReader(queryString)) + query, err := p.ParseQuery() + if err != nil { + httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest) + return + } + results := h.server.ExecuteQuery(query, db, user) + for _, result := range results.Results { + for _, row := range result.Series { + points := make([]Point, 1) + var point Point + point.Name = row.Name + point.Tags = row.Tags + point.Fields = make(map[string]interface{}) + for _, tuple := range row.Values { + for subscript, cell := range tuple { + if row.Columns[subscript] == "time" { + point.Timestamp, _ = time.Parse(time.RFC3339, interfaceToString(cell)) + continue + } + point.Fields[row.Columns[subscript]] = cell + } + points[0] = point + batch := &Batch{ + Database: db, + RetentionPolicy: "default", + Points: points, + } + buf, _ := json.Marshal(&batch) + httpOut.Write(buf) + httpOut.Write([]byte("\n")) + } + } + } + } + w.Header().Add("content-type", "text/plain") + w.Write(httpOut.Bytes()) +} + // serveWrite receives incoming series data and writes it to the database. func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) { var bp client.BatchPoints From 4aabcd907c9b3c76727057046a285fd1b44d3b50 Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Fri, 13 Mar 2015 15:37:23 -0700 Subject: [PATCH 04/13] Dump cmd Issue: 1909 move dump cmd to an http endpoint. Add dump cmd on client to call the endpoint. --- client/influxdb.go | 21 +++++++++ cmd/influx/main.go | 104 +++++++++++---------------------------------- httpd/handler.go | 52 ++++++++++++++--------- 3 files changed, 76 insertions(+), 101 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index 6272cdc3680..17ae19df360 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -145,6 +145,27 @@ func (c *Client) Ping() (time.Duration, string, error) { return time.Since(now), version, nil } +func (c *Client) Dump(db string) (*http.Response, error) { + u := c.url + u.Path = "dump" + values := u.Query() + values.Set("db", db) + values.Set("user", c.username) + values.Set("password", c.password) + u.RawQuery = values.Encode() + + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return nil, err + } + req.Header.Set("User-Agent", c.userAgent) + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + return resp, nil +} + // Structs // Result represents a resultset returned from a single statement. diff --git a/cmd/influx/main.go b/cmd/influx/main.go index f9b191743c1..aed73014e94 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "encoding/csv" "encoding/json" "flag" @@ -13,7 +14,6 @@ import ( "strconv" "strings" "text/tabwriter" - "time" "github.com/influxdb/influxdb/client" // "github.com/influxdb/influxdb/influxql" @@ -42,6 +42,7 @@ type CommandLine struct { Version string Pretty bool // controls pretty print for json Format string // controls the output format. Valid values are json, csv, or column + Dump bool } func main() { @@ -54,8 +55,14 @@ func main() { fs.StringVar(&c.Password, "password", c.Password, `password to connect to the server. Leaving blank will prompt for password (--password="")`) fs.StringVar(&c.Database, "database", c.Database, "database to connect to the server.") fs.StringVar(&c.Format, "output", default_format, "format specifies the format of the server responses: json, csv, or column") + fs.BoolVar(&c.Dump, "dump", false, "dump the contents of the given database to stdout") fs.Parse(os.Args[1:]) + if c.Dump { + c.connect("") + c.dump() + } + var promptForPassword bool // determine if they set the password flag but provided no value for _, v := range os.Args { @@ -142,8 +149,6 @@ func (c *CommandLine) ParseCommand(cmd string) bool { } case strings.HasPrefix(lcmd, "use"): c.use(cmd) - case strings.HasPrefix(lcmd, "dump"): - c.dump() case lcmd == "": break default: @@ -208,7 +213,9 @@ func (c *CommandLine) connect(cmd string) { fmt.Printf("Failed to connect to %s\n", c.Client.Addr()) } else { c.Version = v - fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) + if !c.Dump { + fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) + } } } @@ -252,87 +259,24 @@ func (c *CommandLine) SetFormat(cmd string) { } } -type Point struct { - Name string `json:"name"` - Timestamp time.Time `json:"timestamp"` - Tags map[string]string `json:"tags"` - Fields map[string]interface{} `json:"fields"` -} - -type Batch struct { - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Points []Point `json:"points"` -} - -// Output each point in the database in JSON. -// To get all points: -// Find all measurements (show measurements). -// For each measurement do select * from group by * func (c *CommandLine) dump() { - // Make our list of measurements - var measurements []string - query := "show measurements" - results, err := c.Client.Query(client.Query{Command: query, Database: c.Database}) + fmt.Printf("db is %s\n", c.Database) + response, err := c.Client.Dump(c.Database) + defer response.Body.Close() if err != nil { - fmt.Printf("ERR: %s\n", err) - return - } - for _, measurementResult := range results.Results { - for _, measurementRow := range measurementResult.Series { - for _, measurementTuple := range measurementRow.Values { - for _, measurementCell := range measurementTuple { - measurements = append(measurements, interfaceToString(measurementCell)) - } - } - } - } - // Fetch all the points for each measurement found above. - // From the 'select' query below, we get: - // - // columns:[col1, col2, col3, ...] - // - and - - // values:[[val1, val2, val3, ...], [val1, val2, val3, ...], [val1, val2, val3, ...]...] - // - // We need to turn that into multiple rows like so... - // fields:{col1 : values[0][0], col2 : values[0][1], col3 : values[0][2]} - // fields:{col1 : values[1][0], col2 : values[1][1], col3 : values[1][2]} - // fields:{col1 : values[2][0], col2 : values[2][1], col3 : values[2][2]} - // - for _, measurement := range measurements { - query = fmt.Sprintf("select * from %s group by *", measurement) - results, err = c.Client.Query(client.Query{Command: query, Database: c.Database}) - if err != nil { - fmt.Printf("ERR: %s\n", err) - return + fmt.Fprintf(os.Stderr, "Failed to dump database %s from %s\n", c.Database, c.Client.Addr()) + os.Exit(1) + } else { + scanner := bufio.NewScanner(response.Body) + for scanner.Scan() { + fmt.Println(scanner.Text()) } - for _, result := range results.Results { - for _, row := range result.Series { - points := make([]Point, 1) - var point Point - point.Name = row.Name - point.Tags = row.Tags - point.Fields = make(map[string]interface{}) - for _, tuple := range row.Values { - for subscript, cell := range tuple { - if row.Columns[subscript] == "time" { - point.Timestamp, _ = time.Parse(time.RFC3339, interfaceToString(cell)) - continue - } - point.Fields[row.Columns[subscript]] = cell - } - points[0] = point - batch := &Batch{ - Database: "db", - RetentionPolicy: "raw", - Points: points, - } - buf, _ := json.Marshal(&batch) - fmt.Printf("%s\n", buf) - } - } + if err := scanner.Err(); err != nil { + fmt.Fprintln(os.Stderr, "Failed to dump database %s", err) + os.Exit(1) } } + os.Exit(0) } func (c *CommandLine) executeQuery(query string) { diff --git a/httpd/handler.go b/httpd/handler.go index 56454ac9f60..6435a1a9d03 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -1,7 +1,6 @@ package httpd import ( - "bytes" "encoding/json" "errors" "fmt" @@ -208,26 +207,20 @@ func interfaceToString(v interface{}) string { } } -// serveDump returns all points in the given database as a plaintext list of JSON structs. -// To get all points: -// Find all measurements (show measurements). -// For each measurement do select * from group by * -func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influxdb.User) { - q := r.URL.Query() - db := q.Get("db") - pretty := q.Get("pretty") == "true" - var httpOut bytes.Buffer - - // Make our list of measurements +// Return all the measurements from the given DB +func (h *Handler) showMeasurements(db string, user *influxdb.User) (error, []string) { var measurements []string queryString := "show measurements" p := influxql.NewParser(strings.NewReader(queryString)) query, err := p.ParseQuery() if err != nil { - httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest) - return + return err, measurements } results := h.server.ExecuteQuery(query, db, user) + if results.Err != nil { + return results.Err, measurements + + } for _, measurementResult := range results.Results { for _, measurementRow := range measurementResult.Series { for _, measurementTuple := range (*measurementRow).Values { @@ -237,7 +230,25 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influx } } } - // Fetch all the points for each measurement found above. + return nil, measurements +} + +// serveDump returns all points in the given database as a plaintext list of JSON structs. +// To get all points: +// Find all measurements (show measurements). +// For each measurement do select * from group by * +func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influxdb.User) { + q := r.URL.Query() + db := q.Get("db") + delim := []byte("\n") + pretty := q.Get("pretty") == "true" + err, measurements := h.showMeasurements(db, user) + if err != nil { + httpError(w, "error with dump: "+err.Error(), pretty, http.StatusBadRequest) + return + } + + // Fetch all the points for each measurement. // From the 'select' query below, we get: // // columns:[col1, col2, col3, ...] @@ -250,13 +261,14 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influx // fields:{col1 : values[2][0], col2 : values[2][1], col3 : values[2][2]} // for _, measurement := range measurements { - queryString = fmt.Sprintf("select * from %s group by *", measurement) + queryString := fmt.Sprintf("select * from %s group by *", measurement) p := influxql.NewParser(strings.NewReader(queryString)) query, err := p.ParseQuery() if err != nil { - httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest) + httpError(w, "error with dump: "+err.Error(), pretty, http.StatusBadRequest) return } + // results := h.server.ExecuteQuery(query, db, user) for _, result := range results.Results { for _, row := range result.Series { @@ -280,14 +292,12 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influx Points: points, } buf, _ := json.Marshal(&batch) - httpOut.Write(buf) - httpOut.Write([]byte("\n")) + w.Write(buf) + w.Write(delim) } } } } - w.Header().Add("content-type", "text/plain") - w.Write(httpOut.Bytes()) } // serveWrite receives incoming series data and writes it to the database. From a342a1e3fd6f85765dc8a535adb4f18a5f7a5cd2 Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Fri, 13 Mar 2015 16:11:31 -0700 Subject: [PATCH 05/13] remove commented line --- cmd/influx/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/influx/main.go b/cmd/influx/main.go index acdb5c53cff..00e7c248f2e 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -17,7 +17,6 @@ import ( "text/tabwriter" "github.com/influxdb/influxdb/client" - // "github.com/influxdb/influxdb/influxql" "github.com/peterh/liner" ) From d7420683ae8d403d977bda111fcbe303106aefaf Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Mon, 16 Mar 2015 15:31:41 -0700 Subject: [PATCH 06/13] tweaking in response to various PR comments --- client/influxdb.go | 15 +++++----- cmd/influx/main.go | 22 ++++++-------- httpd/handler.go | 72 ++++++++++++++++++++++------------------------ 3 files changed, 52 insertions(+), 57 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index 17ae19df360..17ff0145cad 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "net/http" "net/url" "time" @@ -145,7 +146,7 @@ func (c *Client) Ping() (time.Duration, string, error) { return time.Since(now), version, nil } -func (c *Client) Dump(db string) (*http.Response, error) { +func (c *Client) Dump(db string) (io.ReadCloser, error) { u := c.url u.Path = "dump" values := u.Query() @@ -163,7 +164,7 @@ func (c *Client) Dump(db string) (*http.Response, error) { if err != nil { return nil, err } - return resp, nil + return resp.Body, nil } // Structs @@ -273,11 +274,11 @@ func (a Results) Error() error { // Precision can be specified if the timestamp is in epoch format (integer). // Valid values for Precision are n, u, ms, s, m, and h type Point struct { - Name string - Tags map[string]string - Timestamp time.Time - Fields map[string]interface{} - Precision string + Name string `json:"name,omitempty"` + Tags map[string]string `json:"tags,omitempty"` + Timestamp time.Time `json:"timestamp,omitempty"` //XXX omitempty doesn't work on time.Time + Fields map[string]interface{} `json:"fields,omitempty"` + Precision string `json:"precision,omitempty"` } // MarshalJSON will format the time in RFC3339Nano diff --git a/cmd/influx/main.go b/cmd/influx/main.go index 00e7c248f2e..a084d75887a 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -58,11 +58,6 @@ func main() { fs.BoolVar(&c.Dump, "dump", false, "dump the contents of the given database to stdout") fs.Parse(os.Args[1:]) - if c.Dump { - c.connect("") - c.dump() - } - var promptForPassword bool // determine if they set the password flag but provided no value for _, v := range os.Args { @@ -90,6 +85,11 @@ func main() { c.connect("") + if c.Dump { + c.dump() + return + } + var historyFile string usr, err := user.Current() // Only load history if we can get the user @@ -260,23 +260,19 @@ func (c *CommandLine) SetFormat(cmd string) { } func (c *CommandLine) dump() { - fmt.Printf("db is %s\n", c.Database) response, err := c.Client.Dump(c.Database) - defer response.Body.Close() + defer response.Close() if err != nil { - fmt.Fprintf(os.Stderr, "Failed to dump database %s from %s\n", c.Database, c.Client.Addr()) - os.Exit(1) + fmt.Printf("Dump failed. %s\n", err) } else { - scanner := bufio.NewScanner(response.Body) + scanner := bufio.NewScanner(response) for scanner.Scan() { fmt.Println(scanner.Text()) } if err := scanner.Err(); err != nil { - fmt.Fprintln(os.Stderr, "Failed to dump database %s", err) - os.Exit(1) + fmt.Printf("Dump failed. %s\n", err) } } - os.Exit(0) } func (c *CommandLine) executeQuery(query string) { diff --git a/httpd/handler.go b/httpd/handler.go index 6435a1a9d03..5c51f927535 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -1,6 +1,7 @@ package httpd import ( + "compress/gzip" "encoding/json" "errors" "fmt" @@ -16,8 +17,6 @@ import ( "sync/atomic" "time" - "compress/gzip" - "code.google.com/p/go-uuid/uuid" "github.com/bmizerany/pat" @@ -179,19 +178,6 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ httpResults(w, results, pretty) } -type Point struct { - Name string `json:"name"` - Timestamp time.Time `json:"timestamp"` - Tags map[string]string `json:"tags"` - Fields map[string]interface{} `json:"fields"` -} - -type Batch struct { - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Points []Point `json:"points"` -} - func interfaceToString(v interface{}) string { switch t := v.(type) { case nil: @@ -207,30 +193,37 @@ func interfaceToString(v interface{}) string { } } +type Point struct { + Name string `json:"name"` + Timestamp time.Time `json:"timestamp"` + Tags map[string]string `json:"tags"` + Fields map[string]interface{} `json:"fields"` +} + +type Batch struct { + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Points []Point `json:"points"` +} + // Return all the measurements from the given DB -func (h *Handler) showMeasurements(db string, user *influxdb.User) (error, []string) { +func (h *Handler) showMeasurements(db string, user *influxdb.User) ([]string, error) { var measurements []string - queryString := "show measurements" - p := influxql.NewParser(strings.NewReader(queryString)) - query, err := p.ParseQuery() - if err != nil { - return err, measurements - } - results := h.server.ExecuteQuery(query, db, user) + results := h.server.ExecuteQuery(&influxql.Query{[]influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, user) if results.Err != nil { - return results.Err, measurements - + return measurements, results.Err } - for _, measurementResult := range results.Results { - for _, measurementRow := range measurementResult.Series { - for _, measurementTuple := range (*measurementRow).Values { - for _, measurementCell := range measurementTuple { - measurements = append(measurements, interfaceToString(measurementCell)) + + for _, result := range results.Results { + for _, row := range result.Series { + for _, tuple := range (*row).Values { + for _, cell := range tuple { + measurements = append(measurements, interfaceToString(cell)) } } } } - return nil, measurements + return measurements, nil } // serveDump returns all points in the given database as a plaintext list of JSON structs. @@ -242,9 +235,9 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influx db := q.Get("db") delim := []byte("\n") pretty := q.Get("pretty") == "true" - err, measurements := h.showMeasurements(db, user) + measurements, err := h.showMeasurements(db, user) if err != nil { - httpError(w, "error with dump: "+err.Error(), pretty, http.StatusBadRequest) + httpError(w, "error with dump: "+err.Error(), pretty, http.StatusInternalServerError) return } @@ -265,7 +258,7 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influx p := influxql.NewParser(strings.NewReader(queryString)) query, err := p.ParseQuery() if err != nil { - httpError(w, "error with dump: "+err.Error(), pretty, http.StatusBadRequest) + httpError(w, "error with dump: "+err.Error(), pretty, http.StatusInternalServerError) return } // @@ -280,18 +273,23 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influx for _, tuple := range row.Values { for subscript, cell := range tuple { if row.Columns[subscript] == "time" { - point.Timestamp, _ = time.Parse(time.RFC3339, interfaceToString(cell)) + point.Timestamp, _ = cell.(time.Time) continue } point.Fields[row.Columns[subscript]] = cell } points[0] = point batch := &Batch{ + Points: points, Database: db, RetentionPolicy: "default", - Points: points, } - buf, _ := json.Marshal(&batch) + buf, err := json.Marshal(&batch) + + if err != nil { + httpError(w, "error with dump: "+err.Error(), pretty, http.StatusInternalServerError) + return + } w.Write(buf) w.Write(delim) } From 995475118203043b2d2d12b468eef1a33c77f36e Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Tue, 17 Mar 2015 13:47:35 -0700 Subject: [PATCH 07/13] fix default retention not set --- httpd/handler.go | 2 +- httpd/handler_test.go | 54 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 48 insertions(+), 8 deletions(-) diff --git a/httpd/handler.go b/httpd/handler.go index b7086cd6d63..3b1687d2385 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -241,8 +241,8 @@ func (h *Handler) showMeasurements(db string, user *influxdb.User) ([]string, er func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influxdb.User) { q := r.URL.Query() db := q.Get("db") - delim := []byte("\n") pretty := q.Get("pretty") == "true" + delim := []byte("\n") measurements, err := h.showMeasurements(db, user) if err != nil { httpError(w, "error with dump: "+err.Error(), pretty, http.StatusInternalServerError) diff --git a/httpd/handler_test.go b/httpd/handler_test.go index ec1b5419e6a..014a0abccbc 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -278,8 +278,8 @@ func TestHandler_RetentionPolicies(t *testing.T) { if status != http.StatusOK { t.Fatalf("unexpected status: %d", status) - } else if body != `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["bar","168h0m0s",1,false]]}]}]}` { - t.Fatalf("unexpected body: %s", body) + } else if !strings.Contains(body, `{"results":[{"series":[{"columns":["name","duration","replicaN","default"],"values":[["default","0",1,true],["bar","168h0m0s",1,false]]}]}]}`) { + t.Fatalf("Missing retention policy: %s", body) } } @@ -1118,16 +1118,53 @@ func TestHandler_DropSeries(t *testing.T) { func TestHandler_serveWriteSeries(t *testing.T) { c := test.NewMessagingClient() defer c.Close() - srvr := OpenAuthenticatedServer(c) + srvr := OpenAuthlessServer(c) srvr.CreateDatabase("foo") - srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) s := NewHTTPServer(srvr) defer s.Close() - status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","fields": {"value": 100}}]}`) + status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "default", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","fields": {"value": 100}}]}`) if status != http.StatusOK { - t.Fatalf("unexpected status: %d", status) + t.Fatalf("unexpected status for post: %d", status) + } + query := map[string]string{"db": "foo", "q": "select * from cpu"} + status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "") + if status != http.StatusOK { + t.Fatalf("unexpected status for get: %d", status) + } + if !strings.Contains(body, `"name":"cpu"`) { + t.Fatalf("Write doesn't match query results. Response body is %s.", body) + } +} + +func TestHandler_serveDump(t *testing.T) { + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) + srvr.CreateDatabase("foo") + s := NewHTTPServer(srvr) + defer s.Close() + + status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "default", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","fields": {"value": 100}}]}`) + + if status != http.StatusOK { + t.Fatalf("unexpected status for post: %d", status) + + } + query := map[string]string{"db": "foo", "q": "select * from cpu"} + status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "") + if status != http.StatusOK { + t.Fatalf("unexpected status for get: %d", status) + } + + query = map[string]string{"db": "foo"} + status, body = MustHTTP("GET", s.URL+`/dump`, query, nil, "") + if status != http.StatusOK { + t.Fatalf("unexpected status for get: %d", status) + } + if !strings.Contains(body, `"name":"cpu"`) { + t.Fatalf("Write doesn't match query results. Response body is %s.", body) } } @@ -1501,6 +1538,7 @@ func MustHTTP(verb, path string, params, headers map[string]string, body string) b, err := ioutil.ReadAll(resp.Body) return resp.StatusCode, strings.TrimRight(string(b), "\n") + } // MustParseURL parses a string into a URL. Panic on error. @@ -1539,7 +1577,9 @@ type Server struct { // NewServer returns a new test server instance. func NewServer() *Server { - return &Server{influxdb.NewServer()} + s := &Server{influxdb.NewServer()} + s.RetentionAutoCreate = true + return s } // OpenAuthenticatedServer returns a new, open test server instance with authentication enabled. From 9b3f0399af9e69723ce0a414ecf4b6fbf72d33f0 Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Tue, 17 Mar 2015 13:57:47 -0700 Subject: [PATCH 08/13] oops, left in some cruft --- cmd/influx/main.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/influx/main.go b/cmd/influx/main.go index a084d75887a..11d7db3875c 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -213,9 +213,7 @@ func (c *CommandLine) connect(cmd string) { fmt.Printf("Failed to connect to %s\n", c.Client.Addr()) } else { c.Version = v - if !c.Dump { - fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) - } + fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) } } From 5df3e8c3aacfebdb7382166f440be115acd1db73 Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Tue, 17 Mar 2015 14:44:15 -0700 Subject: [PATCH 09/13] suppress more client msgs --- cmd/influx/main.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/influx/main.go b/cmd/influx/main.go index 11d7db3875c..5ef8aa65d5a 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -68,9 +68,6 @@ func main() { } } - // TODO Determine if we are an ineractive shell or running commands - fmt.Println("InfluxDB shell " + version) - c.Line = liner.NewLiner() defer c.Line.Close() @@ -90,6 +87,8 @@ func main() { return } + fmt.Println("InfluxDB shell " + version) + var historyFile string usr, err := user.Current() // Only load history if we can get the user @@ -213,7 +212,9 @@ func (c *CommandLine) connect(cmd string) { fmt.Printf("Failed to connect to %s\n", c.Client.Addr()) } else { c.Version = v - fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) + if !c.Dump { + fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) + } } } From 9063312636c2c3663d88d8ff0e2d6e7d95c5225a Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Tue, 17 Mar 2015 15:27:33 -0700 Subject: [PATCH 10/13] cory sez to remove tags --- client/influxdb.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index 17ff0145cad..b4872f8ab23 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -274,11 +274,11 @@ func (a Results) Error() error { // Precision can be specified if the timestamp is in epoch format (integer). // Valid values for Precision are n, u, ms, s, m, and h type Point struct { - Name string `json:"name,omitempty"` - Tags map[string]string `json:"tags,omitempty"` - Timestamp time.Time `json:"timestamp,omitempty"` //XXX omitempty doesn't work on time.Time - Fields map[string]interface{} `json:"fields,omitempty"` - Precision string `json:"precision,omitempty"` + Name string + Tags map[string]string + Timestamp time.Time + Fields map[string]interface{} + Precision string } // MarshalJSON will format the time in RFC3339Nano From 91014def56686a372ab7f7961892754d96dbc4be Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Tue, 17 Mar 2015 15:48:54 -0700 Subject: [PATCH 11/13] sleep hack --- httpd/handler_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 014a0abccbc..1f5b987e993 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -1158,6 +1158,7 @@ func TestHandler_serveDump(t *testing.T) { t.Fatalf("unexpected status for get: %d", status) } + time.Sleep(500 * time.Millisecond) // Shouldn't committed data be readable? query = map[string]string{"db": "foo"} status, body = MustHTTP("GET", s.URL+`/dump`, query, nil, "") if status != http.StatusOK { From 21fd2e2662a4fb729f55d6c3cc123ed271d749b1 Mon Sep 17 00:00:00 2001 From: Joseph Rothrock Date: Wed, 18 Mar 2015 14:10:36 -0700 Subject: [PATCH 12/13] dump cmd rename dump flag put an error response body if json marshalling breaks. detect and respond to http error codes in the client. --- client/influxdb.go | 3 +++ cmd/influx/main.go | 28 ++++++++++++++-------------- httpd/handler.go | 3 ++- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index b4872f8ab23..e46a83f072f 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -164,6 +164,9 @@ func (c *Client) Dump(db string) (io.ReadCloser, error) { if err != nil { return nil, err } + if resp.StatusCode != http.StatusOK { + return resp.Body, fmt.Errorf("HTTP Protocol error %d", resp.StatusCode) + } return resp.Body, nil } diff --git a/cmd/influx/main.go b/cmd/influx/main.go index 5ef8aa65d5a..a095018fbb9 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -32,17 +32,17 @@ const ( ) type CommandLine struct { - Client *client.Client - Line *liner.State - Host string - Port int - Username string - Password string - Database string - Version string - Pretty bool // controls pretty print for json - Format string // controls the output format. Valid values are json, csv, or column - Dump bool + Client *client.Client + Line *liner.State + Host string + Port int + Username string + Password string + Database string + Version string + Pretty bool // controls pretty print for json + Format string // controls the output format. Valid values are json, csv, or column + DumpTheDatabase bool } func main() { @@ -55,7 +55,7 @@ func main() { fs.StringVar(&c.Password, "password", c.Password, `password to connect to the server. Leaving blank will prompt for password (--password="")`) fs.StringVar(&c.Database, "database", c.Database, "database to connect to the server.") fs.StringVar(&c.Format, "output", default_format, "format specifies the format of the server responses: json, csv, or column") - fs.BoolVar(&c.Dump, "dump", false, "dump the contents of the given database to stdout") + fs.BoolVar(&c.DumpTheDatabase, "dump-the-database", false, "dump the contents of the given database to stdout") fs.Parse(os.Args[1:]) var promptForPassword bool @@ -82,7 +82,7 @@ func main() { c.connect("") - if c.Dump { + if c.DumpTheDatabase { c.dump() return } @@ -212,7 +212,7 @@ func (c *CommandLine) connect(cmd string) { fmt.Printf("Failed to connect to %s\n", c.Client.Addr()) } else { c.Version = v - if !c.Dump { + if !c.DumpTheDatabase { fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) } } diff --git a/httpd/handler.go b/httpd/handler.go index 3b1687d2385..00997e7a578 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -295,7 +295,8 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influx buf, err := json.Marshal(&batch) if err != nil { - httpError(w, "error with dump: "+err.Error(), pretty, http.StatusInternalServerError) + w.Write([]byte("*** SERVER-SIDE ERROR. MISSING DATA ***")) + w.Write(delim) return } w.Write(buf) From 8425c0c57858bc2b2b3c2ce7252fba344c18b8dd Mon Sep 17 00:00:00 2001 From: Todd Persen Date: Wed, 18 Mar 2015 23:02:11 -0700 Subject: [PATCH 13/13] Final cleanups based on review feedback. --- cmd/influx/main.go | 36 ++++++++++++++++-------------------- httpd/handler.go | 5 +++++ 2 files changed, 21 insertions(+), 20 deletions(-) diff --git a/cmd/influx/main.go b/cmd/influx/main.go index a095018fbb9..4fb180dce66 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -1,7 +1,6 @@ package main import ( - "bufio" "encoding/csv" "encoding/json" "flag" @@ -32,17 +31,17 @@ const ( ) type CommandLine struct { - Client *client.Client - Line *liner.State - Host string - Port int - Username string - Password string - Database string - Version string - Pretty bool // controls pretty print for json - Format string // controls the output format. Valid values are json, csv, or column - DumpTheDatabase bool + Client *client.Client + Line *liner.State + Host string + Port int + Username string + Password string + Database string + Version string + Pretty bool // controls pretty print for json + Format string // controls the output format. Valid values are json, csv, or column + ShouldDump bool } func main() { @@ -55,7 +54,7 @@ func main() { fs.StringVar(&c.Password, "password", c.Password, `password to connect to the server. Leaving blank will prompt for password (--password="")`) fs.StringVar(&c.Database, "database", c.Database, "database to connect to the server.") fs.StringVar(&c.Format, "output", default_format, "format specifies the format of the server responses: json, csv, or column") - fs.BoolVar(&c.DumpTheDatabase, "dump-the-database", false, "dump the contents of the given database to stdout") + fs.BoolVar(&c.ShouldDump, "dump", false, "dump the contents of the given database to stdout") fs.Parse(os.Args[1:]) var promptForPassword bool @@ -82,7 +81,7 @@ func main() { c.connect("") - if c.DumpTheDatabase { + if c.ShouldDump { c.dump() return } @@ -212,7 +211,7 @@ func (c *CommandLine) connect(cmd string) { fmt.Printf("Failed to connect to %s\n", c.Client.Addr()) } else { c.Version = v - if !c.DumpTheDatabase { + if !c.ShouldDump { fmt.Printf("Connected to %s version %s\n", c.Client.Addr(), c.Version) } } @@ -264,11 +263,8 @@ func (c *CommandLine) dump() { if err != nil { fmt.Printf("Dump failed. %s\n", err) } else { - scanner := bufio.NewScanner(response) - for scanner.Scan() { - fmt.Println(scanner.Text()) - } - if err := scanner.Err(); err != nil { + _, err := io.Copy(os.Stdout, response) + if err != nil { fmt.Printf("Dump failed. %s\n", err) } } diff --git a/httpd/handler.go b/httpd/handler.go index 00997e7a578..2cba2280598 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -294,6 +294,11 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influx } buf, err := json.Marshal(&batch) + // TODO: Make this more legit in the future + // Since we're streaming data as chunked responses, this error could + // be in the middle of an already-started data stream. Until Go 1.5, + // we can't really support proper trailer headers, so we'll just + // wait until then: https://code.google.com/p/go/issues/detail?id=7759 if err != nil { w.Write([]byte("*** SERVER-SIDE ERROR. MISSING DATA ***")) w.Write(delim)