Skip to content

Commit

Permalink
Merge pull request #2696 from influxdb/jw-write-path
Browse files Browse the repository at this point in the history
Line protocol write API
  • Loading branch information
jwilder committed May 29, 2015
2 parents 6bd781c + 870a183 commit 99bc7d2
Show file tree
Hide file tree
Showing 7 changed files with 1,270 additions and 70 deletions.
2 changes: 1 addition & 1 deletion client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ func EpochToTime(epoch int64, precision string) (time.Time, error) {
case "n":
t = time.Unix(0, epoch)
default:
return time.Time{}, fmt.Errorf("Unknowm precision %q", precision)
return time.Time{}, fmt.Errorf("Unknown precision %q", precision)
}
return t, nil
}
Expand Down
103 changes: 103 additions & 0 deletions httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"github.com/bmizerany/pat"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/tsdb"
"github.com/influxdb/influxdb/uuid"
)

Expand Down Expand Up @@ -123,6 +125,10 @@ func NewAPIHandler(s *influxdb.Server, requireAuthentication, loggingEnabled boo
"write", // Data-ingest route.
"POST", "/write", true, true, h.serveWrite,
},
route{
"write_points", // Data-ingest route.
"POST", "/write_points", true, true, h.serveWritePoints,
},
route{ // Status
"status",
"GET", "/status", true, true, h.serveStatus,
Expand Down Expand Up @@ -538,6 +544,103 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ
}
}

// serveWritePoints receives incoming series data and writes it to the database.
func (h *Handler) serveWritePoints(w http.ResponseWriter, r *http.Request, user *influxdb.User) {
var writeError = func(result influxdb.Result, statusCode int) {
w.WriteHeader(statusCode)
w.Write([]byte(result.Err.Error()))
return
}

// Check to see if we have a gzip'd post
var body io.ReadCloser
if r.Header.Get("Content-encoding") == "gzip" {
b, err := gzip.NewReader(r.Body)
if err != nil {
writeError(influxdb.Result{Err: err}, http.StatusBadRequest)
return
}
body = b
defer r.Body.Close()
} else {
body = r.Body
defer r.Body.Close()
}

b, err := ioutil.ReadAll(body)
if err != nil {
if h.WriteTrace {
h.Logger.Print("write handler failed to read bytes from request body")
}
writeError(influxdb.Result{Err: err}, http.StatusBadRequest)
return
}
if h.WriteTrace {
h.Logger.Printf("write body received by handler: %s", string(b))
}

precision := r.FormValue("precision")
if precision == "" {
precision = "n"
}

points, err := tsdb.ParsePointsWithPrecision(b, time.Now().UTC(), precision)
if err != nil {
if err.Error() == "EOF" {
w.WriteHeader(http.StatusOK)
return
}
writeError(influxdb.Result{Err: err}, http.StatusBadRequest)
return
}

database := r.FormValue("db")
if database == "" {
writeError(influxdb.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest)
return
}

if !h.server.DatabaseExists(database) {
writeError(influxdb.Result{Err: fmt.Errorf("database not found: %q", database)}, http.StatusNotFound)
return
}

if h.requireAuthentication && user == nil {
writeError(influxdb.Result{Err: fmt.Errorf("user is required to write to database %q", database)}, http.StatusUnauthorized)
return
}

if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, database) {
writeError(influxdb.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, database)}, http.StatusUnauthorized)
return
}

retentionPolicy := r.Form.Get("rp")
consistencyVal := r.Form.Get("consistency")
consistency := cluster.ConsistencyLevelOne
switch consistencyVal {
case "all":
consistency = cluster.ConsistencyLevelAll
case "any":
consistency = cluster.ConsistencyLevelAny
case "one":
consistency = cluster.ConsistencyLevelOne
case "quorum":
consistency = cluster.ConsistencyLevelQuorum
}

if err := h.server.WritePoints(database, retentionPolicy, consistency, points); err != nil {
if influxdb.IsClientError(err) {
writeError(influxdb.Result{Err: err}, http.StatusBadRequest)
} else {
writeError(influxdb.Result{Err: err}, http.StatusInternalServerError)
}
return
} else {
w.WriteHeader(http.StatusNoContent)
}
}

// serveMetastore returns a copy of the metastore.
func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request) {
// Set headers.
Expand Down
11 changes: 11 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1848,6 +1848,17 @@ func (s *Server) DropSeries(database string, seriesByMeasurement map[string][]ui
return err
}

func (s *Server) WritePoints(database, retentionPolicy string, consistency cluster.ConsistencyLevel, points []tsdb.Point) error {
wpr := &cluster.WritePointsRequest{
Database: database,
RetentionPolicy: retentionPolicy,
ConsistencyLevel: consistency,
Points: points,
}

return s.pw.Write(wpr)
}

// WriteSeries writes series data to the database.
// Returns the messaging index the data was written to.
func (s *Server) WriteSeries(database, retentionPolicy string, points []tsdb.Point) (idx uint64, err error) {
Expand Down
Loading

0 comments on commit 99bc7d2

Please sign in to comment.