From 6f61c0ea4a7fd494ec1092554e9ec3309a5d5f29 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 29 Apr 2016 09:00:21 -0400 Subject: [PATCH] Add POST /query endpoint and warning messages for using GET with write operations In order to follow REST a bit more carefully, all write operations should go through a POST in the future. We still allow read operations through either GET or POST (similar to the Graphite /render endpoint), but write operations will trigger a returned warning as part of the JSON response and will eventually return an error. Also updates the Golang client libraries to always use POST instead of GET. Fixes #6290. --- CHANGELOG.md | 6 +-- client/influxdb.go | 2 +- client/v2/client.go | 2 +- cluster/statement_executor.go | 73 ++++++++++++++++++++++++-- cluster/statement_executor_test.go | 2 +- cmd/influxd/run/server_helpers_test.go | 2 +- influxql/query_executor.go | 16 ++++-- influxql/query_executor_test.go | 22 ++++---- influxql/result.go | 13 +++++ services/continuous_querier/service.go | 2 +- services/httpd/handler.go | 20 ++++--- 11 files changed, 125 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 745da2288ff..58c7295ed68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,7 +20,7 @@ - [#4675](https://github.com/influxdata/influxdb/issues/4675): Allow derivative() function to be used with ORDER BY desc. - [#6483](https://github.com/influxdata/influxdb/pull/6483): Delete series support for TSM - [#6484](https://github.com/influxdata/influxdb/pull/6484): Query language support for DELETE - +- [#6290](https://github.com/influxdata/influxdb/issues/6290): Add POST /query endpoint and warning messages for using GET with write operations. ### Bugfixes @@ -47,8 +47,8 @@ - [#6468](https://github.com/influxdata/influxdb/issues/6468): Panic with truncated wal segments - [#6491](https://github.com/influxdata/influxdb/pull/6491): Fix the CLI not to enter an infinite loop when the liner has an error. - [#6457](https://github.com/influxdata/influxdb/issues/6457): Retention policy cleanup does not remove series -- [#6477] (https://github.com/influxdata/influxdb/pull/6477): Don't catch SIGQUIT or SIGHUP signals. -- [#6468](https://github.com/influxdata/influxdb/issues/6468): Panic with truncated wal segments +- [#6477](https://github.com/influxdata/influxdb/pull/6477): Don't catch SIGQUIT or SIGHUP signals. +- [#6468](https://github.com/influxdata/influxdb/issues/6468): Panic with truncated wal segments - [#6480](https://github.com/influxdata/influxdb/issues/6480): Fix SHOW statements' rewriting bug - [#6505](https://github.com/influxdata/influxdb/issues/6505): Add regex literal to InfluxQL spec for FROM clause. diff --git a/client/influxdb.go b/client/influxdb.go index 020c2cd3b86..90695b9ed21 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -181,7 +181,7 @@ func (c *Client) Query(q Query) (*Response, error) { } u.RawQuery = values.Encode() - req, err := http.NewRequest("GET", u.String(), nil) + req, err := http.NewRequest("POST", u.String(), nil) if err != nil { return nil, err } diff --git a/client/v2/client.go b/client/v2/client.go index 3776eab3040..a5b63191376 100644 --- a/client/v2/client.go +++ b/client/v2/client.go @@ -541,7 +541,7 @@ func (c *client) Query(q Query) (*Response, error) { u := c.url u.Path = "query" - req, err := http.NewRequest("GET", u.String(), nil) + req, err := http.NewRequest("POST", u.String(), nil) if err != nil { return nil, err } diff --git a/cluster/statement_executor.go b/cluster/statement_executor.go index 58eb4522b61..c755aeba4b5 100644 --- a/cluster/statement_executor.go +++ b/cluster/statement_executor.go @@ -48,10 +48,19 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influ var err error switch stmt := stmt.(type) { case *influxql.AlterRetentionPolicyStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeAlterRetentionPolicyStatement(stmt) case *influxql.CreateContinuousQueryStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeCreateContinuousQueryStatement(stmt) case *influxql.CreateDatabaseStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } if stmt.IfNotExists { ctx.Log.Println("WARNING: IF NOT EXISTS is deprecated as of v0.13.0 and will be removed in v1.0") messages = append(messages, &influxql.Message{ @@ -61,36 +70,81 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influ } err = e.executeCreateDatabaseStatement(stmt) case *influxql.CreateRetentionPolicyStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeCreateRetentionPolicyStatement(stmt) case *influxql.CreateSubscriptionStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeCreateSubscriptionStatement(stmt) case *influxql.CreateUserStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeCreateUserStatement(stmt) case *influxql.DeleteSeriesStatement: err = e.executeDeleteSeriesStatement(stmt, ctx.Database) case *influxql.DropContinuousQueryStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeDropContinuousQueryStatement(stmt) case *influxql.DropDatabaseStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeDropDatabaseStatement(stmt) case *influxql.DropMeasurementStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeDropMeasurementStatement(stmt, ctx.Database) case *influxql.DropSeriesStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeDropSeriesStatement(stmt, ctx.Database) case *influxql.DropRetentionPolicyStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeDropRetentionPolicyStatement(stmt) case *influxql.DropShardStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeDropShardStatement(stmt) case *influxql.DropSubscriptionStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeDropSubscriptionStatement(stmt) case *influxql.DropUserStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeDropUserStatement(stmt) case *influxql.GrantStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeGrantStatement(stmt) case *influxql.GrantAdminStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeGrantAdminStatement(stmt) case *influxql.RevokeStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeRevokeStatement(stmt) case *influxql.RevokeAdminStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeRevokeAdminStatement(stmt) case *influxql.ShowContinuousQueriesStatement: rows, err = e.executeShowContinuousQueriesStatement(stmt) @@ -113,6 +167,9 @@ func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influ case *influxql.ShowUsersStatement: rows, err = e.executeShowUsersStatement(stmt) case *influxql.SetPasswordUserStatement: + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } err = e.executeSetPasswordUserStatement(stmt) default: return influxql.ErrInvalidQuery @@ -423,11 +480,6 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen break } - result := &influxql.Result{ - StatementID: ctx.StatementID, - Series: []*models.Row{row}, - } - // Write points back into system for INTO statements. if stmt.Target != nil { if err := e.writeInto(stmt, row); err != nil { @@ -437,6 +489,11 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen continue } + result := &influxql.Result{ + StatementID: ctx.StatementID, + Series: []*models.Row{row}, + } + // Send results or exit if closing. select { case <-ctx.InterruptCh: @@ -449,8 +506,14 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen // Emit write count if an INTO statement. if stmt.Target != nil { + var messages []*influxql.Message + if ctx.ReadOnly { + messages = append(messages, influxql.ReadOnlyWarning(stmt.String())) + } + ctx.Results <- &influxql.Result{ StatementID: ctx.StatementID, + Messages: messages, Series: []*models.Row{{ Name: "result", Columns: []string{"time", "written"}, diff --git a/cluster/statement_executor_test.go b/cluster/statement_executor_test.go index 83be97c735b..7d4ce2da3cb 100644 --- a/cluster/statement_executor_test.go +++ b/cluster/statement_executor_test.go @@ -199,7 +199,7 @@ func DefaultQueryExecutor() *QueryExecutor { // ExecuteQuery parses query and executes against the database. func (e *QueryExecutor) ExecuteQuery(query, database string, chunkSize int) <-chan *influxql.Result { - return e.QueryExecutor.ExecuteQuery(MustParseQuery(query), database, chunkSize, make(chan struct{})) + return e.QueryExecutor.ExecuteQuery(MustParseQuery(query), database, chunkSize, false, make(chan struct{})) } // TSDBStore is a mockable implementation of cluster.TSDBStore. diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index 4124c96b71a..79ae29f47b0 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -143,7 +143,7 @@ func (s *Server) QueryWithParams(query string, values url.Values) (results strin v, _ = url.ParseQuery(values.Encode()) } v.Set("q", query) - return s.HTTPGet(s.URL() + "/query?" + v.Encode()) + return s.HTTPPost(s.URL()+"/query?"+v.Encode(), nil) } // MustQueryWithParams executes a query against the server and returns the results. diff --git a/influxql/query_executor.go b/influxql/query_executor.go index 31e5957de76..e029068cd8b 100644 --- a/influxql/query_executor.go +++ b/influxql/query_executor.go @@ -79,6 +79,9 @@ type ExecutionContext struct { // The requested maximum number of points to return in each result. ChunkSize int + // If this query is being executed in a read-only context. + ReadOnly bool + // Hold the query executor's logger. Log *log.Logger @@ -158,13 +161,13 @@ func (e *QueryExecutor) SetLogOutput(w io.Writer) { } // ExecuteQuery executes each statement within a query. -func (e *QueryExecutor) ExecuteQuery(query *Query, database string, chunkSize int, closing chan struct{}) <-chan *Result { +func (e *QueryExecutor) ExecuteQuery(query *Query, database string, chunkSize int, readonly bool, closing chan struct{}) <-chan *Result { results := make(chan *Result) - go e.executeQuery(query, database, chunkSize, closing, results) + go e.executeQuery(query, database, chunkSize, readonly, closing, results) return results } -func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize int, closing <-chan struct{}, results chan *Result) { +func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize int, readonly bool, closing <-chan struct{}, results chan *Result) { defer close(results) defer e.recover(query, results) @@ -188,6 +191,7 @@ func (e *QueryExecutor) executeQuery(query *Query, database string, chunkSize in Results: results, Database: database, ChunkSize: chunkSize, + ReadOnly: readonly, Log: e.Logger, InterruptCh: task.closing, } @@ -240,9 +244,15 @@ loop: } continue loop case *KillQueryStatement: + var messages []*Message + if ctx.ReadOnly { + messages = append(messages, ReadOnlyWarning(stmt.String())) + } + err := e.executeKillQueryStatement(stmt) results <- &Result{ StatementID: i, + Messages: messages, Err: err, } diff --git a/influxql/query_executor_test.go b/influxql/query_executor_test.go index d819e10a708..377ce4c0518 100644 --- a/influxql/query_executor_test.go +++ b/influxql/query_executor_test.go @@ -39,7 +39,7 @@ func TestQueryExecutor_AttachQuery(t *testing.T) { }, } - discardOutput(e.ExecuteQuery(q, "mydb", 100, nil)) + discardOutput(e.ExecuteQuery(q, "mydb", 100, false, nil)) } func TestQueryExecutor_KillQuery(t *testing.T) { @@ -64,12 +64,12 @@ func TestQueryExecutor_KillQuery(t *testing.T) { }, } - results := e.ExecuteQuery(q, "mydb", 100, nil) + results := e.ExecuteQuery(q, "mydb", 100, false, nil) q, err = influxql.ParseQuery(fmt.Sprintf("KILL QUERY %d", <-qid)) if err != nil { t.Fatal(err) } - discardOutput(e.ExecuteQuery(q, "mydb", 100, nil)) + discardOutput(e.ExecuteQuery(q, "mydb", 100, false, nil)) result := <-results if result.Err != influxql.ErrQueryInterrupted { @@ -97,7 +97,7 @@ func TestQueryExecutor_Interrupt(t *testing.T) { } closing := make(chan struct{}) - results := e.ExecuteQuery(q, "mydb", 100, closing) + results := e.ExecuteQuery(q, "mydb", 100, false, closing) close(closing) result := <-results if result.Err != influxql.ErrQueryInterrupted { @@ -124,7 +124,7 @@ func TestQueryExecutor_ShowQueries(t *testing.T) { t.Fatal(err) } - results := e.ExecuteQuery(q, "", 100, nil) + results := e.ExecuteQuery(q, "", 100, false, nil) result := <-results if len(result.Series) != 1 { t.Errorf("expected %d rows, got %d", 1, len(result.Series)) @@ -154,7 +154,7 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) { } e.QueryTimeout = time.Nanosecond - results := e.ExecuteQuery(q, "mydb", 100, nil) + results := e.ExecuteQuery(q, "mydb", 100, false, nil) result := <-results if result.Err != influxql.ErrQueryTimeoutReached { t.Errorf("unexpected error: %s", result.Err) @@ -181,11 +181,11 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) { defer e.Close() // Start first query and wait for it to be executing. - go discardOutput(e.ExecuteQuery(q, "mydb", 100, nil)) + go discardOutput(e.ExecuteQuery(q, "mydb", 100, false, nil)) <-qid // Start second query and expect for it to fail. - results := e.ExecuteQuery(q, "mydb", 100, nil) + results := e.ExecuteQuery(q, "mydb", 100, false, nil) select { case result := <-results: @@ -219,7 +219,7 @@ func TestQueryExecutor_Close(t *testing.T) { }, } - results := e.ExecuteQuery(q, "mydb", 100, nil) + results := e.ExecuteQuery(q, "mydb", 100, false, nil) go func(results <-chan *influxql.Result) { result := <-results if result.Err != influxql.ErrQueryEngineShutdown { @@ -240,7 +240,7 @@ func TestQueryExecutor_Close(t *testing.T) { t.Error("closing the query manager did not kill the query after 100 milliseconds") } - results = e.ExecuteQuery(q, "mydb", 100, nil) + results = e.ExecuteQuery(q, "mydb", 100, false, nil) result := <-results if len(result.Series) != 0 { t.Errorf("expected %d rows, got %d", 0, len(result.Series)) @@ -263,7 +263,7 @@ func TestQueryExecutor_Panic(t *testing.T) { }, } - results := e.ExecuteQuery(q, "mydb", 100, nil) + results := e.ExecuteQuery(q, "mydb", 100, false, nil) result := <-results if len(result.Series) != 0 { t.Errorf("expected %d rows, got %d", 0, len(result.Series)) diff --git a/influxql/result.go b/influxql/result.go index d1f2085bb12..0069208fb85 100644 --- a/influxql/result.go +++ b/influxql/result.go @@ -3,6 +3,7 @@ package influxql import ( "encoding/json" "errors" + "fmt" "github.com/influxdata/influxdb/models" ) @@ -40,6 +41,18 @@ type Message struct { Text string `json:"text"` } +// ReadOnlyWarning generates a warning message that tells the user the command +// they are using is being used for writing in a read only context. +// +// This is a temporary method while to be used while transitioning to read only +// operations for issue #6290. +func ReadOnlyWarning(stmt string) *Message { + return &Message{ + Level: WarningLevel, + Text: fmt.Sprintf("deprecated use of '%s' in a read only context, please use a POST request instead", stmt), + } +} + // Result represents a resultset returned from a single statement. // Rows represents a list of rows that can be sorted consistently by name/tag. type Result struct { diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 60858a4f528..f42f893a628 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -348,7 +348,7 @@ func (s *Service) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { defer close(closing) // Execute the SELECT. - ch := s.QueryExecutor.ExecuteQuery(q, cq.Database, NoChunkingSize, closing) + ch := s.QueryExecutor.ExecuteQuery(q, cq.Database, NoChunkingSize, false, closing) // There is only one statement, so we will only ever receive one result res, ok := <-ch diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 13422e3adb2..10bef806c2b 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -103,6 +103,10 @@ func NewHandler(requireAuthentication, loggingEnabled, writeTrace bool, rowLimit "query", // Query serving route. "GET", "/query", true, true, h.serveQuery, }, + Route{ + "query", // Query serving route. + "POST", "/query", true, true, h.serveQuery, + }, Route{ "write-options", // Satisfy CORS checks. "OPTIONS", "/write", true, true, h.serveOptions, @@ -243,19 +247,18 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. h.statMap.Add(statQueryRequestDuration, time.Since(start).Nanoseconds()) }(time.Now()) - q := r.URL.Query() - pretty := q.Get("pretty") == "true" + pretty := r.FormValue("pretty") == "true" - qp := strings.TrimSpace(q.Get("q")) + qp := strings.TrimSpace(r.FormValue("q")) if qp == "" { httpError(w, `missing required parameter "q"`, pretty, http.StatusBadRequest) return } - epoch := strings.TrimSpace(q.Get("epoch")) + epoch := strings.TrimSpace(r.FormValue("epoch")) p := influxql.NewParser(strings.NewReader(qp)) - db := q.Get("db") + db := r.FormValue("db") // Sanitize the request query params so it doesn't show up in the response logger. // Do this before anything else so a parsing error doesn't leak passwords. @@ -280,10 +283,10 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. } // Parse chunk size. Use default if not provided or unparsable. - chunked := (q.Get("chunked") == "true") + chunked := (r.FormValue("chunked") == "true") chunkSize := DefaultChunkSize if chunked { - if n, err := strconv.ParseInt(q.Get("chunk_size"), 10, 64); err == nil && int(n) > 0 { + if n, err := strconv.ParseInt(r.FormValue("chunk_size"), 10, 64); err == nil && int(n) > 0 { chunkSize = int(n) } } @@ -314,7 +317,8 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta. // Execute query. w.Header().Add("Connection", "close") w.Header().Add("content-type", "application/json") - results := h.QueryExecutor.ExecuteQuery(query, db, chunkSize, closing) + readonly := r.Method == "GET" || r.Method == "HEAD" + results := h.QueryExecutor.ExecuteQuery(query, db, chunkSize, readonly, closing) // if we're not chunking, this will be the in memory buffer for all results before sending to client resp := Response{Results: make([]*influxql.Result, 0)}