Skip to content

Commit

Permalink
Add HTTP endpoint that serves a requested shard
Browse files Browse the repository at this point in the history
With this change a datanode can stream the requested shard to the
client. An error is returned if the shard does not exist or the the
shard is not local to that node.

1 data node can hit this endpoint to request data for a given shard if
the data no longer resides on the broker.
  • Loading branch information
otoolep committed Apr 22, 2015
1 parent 20ee9e3 commit e75e6a9
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 4 deletions.
22 changes: 22 additions & 0 deletions httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func NewClusterHandler(s *influxdb.Server, requireAuthentication, snapshotEnable
"metastore",
"GET", "/data/metastore", false, false, h.serveMetastore,
},
route{ // Shards for peer-to-peer replication
"shard",
"GET", "/data/shard/:id", false, false, h.serveShard,
},
route{ // Tell data node to run CQs that should be run
"process_continuous_queries",
"POST", "/data/process_continuous_queries", false, false, h.serveProcessContinuousQueries,
Expand Down Expand Up @@ -547,6 +551,24 @@ func (h *Handler) serveMetastore(w http.ResponseWriter, r *http.Request) {
}
}

// serveShard returns a copy of the requested shard.
func (h *Handler) serveShard(w http.ResponseWriter, r *http.Request) {
id := r.URL.Query().Get(":id")
shardID, err := strconv.ParseUint(id, 10, 64)
if err != nil {
httpError(w, fmt.Sprintf("invalid shard ID %s: %s", id, err), false, http.StatusBadRequest)
return
}

// Set headers.
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, id))

if err := h.server.CopyShard(w, shardID); err != nil {
httpError(w, err.Error(), false, http.StatusInternalServerError)
}
}

// serveStatus returns a set of states that the server is currently in.
func (h *Handler) serveStatus(w http.ResponseWriter, r *http.Request) {
w.Header().Add("content-type", "application/json")
Expand Down
11 changes: 7 additions & 4 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,15 @@ var (
// policy on a database but the default has not been set.
ErrDefaultRetentionPolicyNotFound = errors.New("default retention policy not found")

// ErrShardNotFound is returned writing to a non-existent shard.
// ErrShardNotFound is returned when attempting to access a non-existent shard
ErrShardNotFound = errors.New("shard not found")

// ErrShardNotLocal is returned when a server attempts to access a shard that is not local
ErrShardNotLocal = errors.New("shard not local")

// ErrShardShortRead returned when the number of bytes read from a shard is less than expected.
ErrShardShortRead = errors.New("shard read returned insufficient data")

// ErrInvalidPointBuffer is returned when a buffer containing data for writing is invalid
ErrInvalidPointBuffer = errors.New("invalid point buffer")

Expand Down Expand Up @@ -143,9 +149,6 @@ var (

// ErrContinuousQueryNotFound is returned when dropping a nonexistent continuous query.
ErrContinuousQueryNotFound = errors.New("continuous query not found")

// ErrShardNotLocal is thrown whan a server attempts to run a mapper against a shard it doesn't have a copy of.
ErrShardNotLocal = errors.New("shard not local")
)

func ErrDatabaseNotFound(name string) error { return Errorf("database not found: %s", name) }
Expand Down
33 changes: 33 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,39 @@ func (s *Server) CopyMetastore(w io.Writer) error {
})
}

// CopyShard writes the requested shard to a writer.
func (s *Server) CopyShard(w io.Writer, shardID uint64) error {
s.mu.RLock()

// Locate the shard.
sh, ok := s.shards[shardID]
if !ok {
s.mu.RUnlock()
return ErrShardNotFound
}
if sh.store == nil {
s.mu.RUnlock()
return ErrShardNotLocal
}

return sh.view(func(tx *shardtx) error {
s.mu.RUnlock() // Unlock so not held during long read.
sz := int(tx.Size())

// Set content length if this is a HTTP connection.
if w, ok := w.(http.ResponseWriter); ok {
w.Header().Set("Content-Length", strconv.Itoa(sz))
}

// Write entire shard to the writer.
n, err := tx.WriteTo(w)
if n != int64(sz) {
return ErrShardShortRead
}
return err
})
}

// DataNode returns a data node by id.
func (s *Server) DataNode(id uint64) *DataNode {
s.mu.RLock()
Expand Down
25 changes: 25 additions & 0 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,31 @@ func TestServer_DeleteShardGroup(t *testing.T) {
}
}

// Ensure the server can stream shards to client
func TestServer_CopyShard(t *testing.T) {
c := test.NewDefaultMessagingClient()
defer c.Close()
s := OpenServer(c)
defer s.Close()
s.CreateDatabase("foo")
s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "raw", Duration: 1 * time.Hour})
s.SetDefaultRetentionPolicy("foo", "raw")

// Write series with one point to the database to ensure shard 1 is created.
s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "series1", Fields: map[string]interface{}{"value": float64(20)}}})
time.Sleep(time.Millisecond * 100)

err := s.CopyShard(ioutil.Discard, 1234)
if err != influxdb.ErrShardNotFound {
t.Errorf("received unexpected result when requesting non-existing shard: %v", err)
}

err = s.CopyShard(ioutil.Discard, 1)
if err != nil {
t.Errorf("failed to copy shard 1: %s", err.Error())
}
}

/* TODO(benbjohnson): Change test to not expose underlying series ids directly.
func TestServer_Measurements(t *testing.T) {
c := test.NewDefaultMessagingClient()
Expand Down
22 changes: 22 additions & 0 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func (g *ShardGroup) dropSeries(seriesIDs ...uint64) error {
return nil
}

// shardtx represents a shard transaction.
type shardtx struct {
*bolt.Tx
}

// Shard represents the logical storage for a given time range.
// The instance on a local server may contain the raw data in "store" if the
// shard is assigned to the server's data node id.
Expand Down Expand Up @@ -133,6 +138,23 @@ func shardMetaIndex(tx *bolt.Tx) uint64 {
return index
}

// view executes a function in the context of a read-only transaction.
func (s *Shard) view(fn func(*shardtx) error) error {
return s.store.View(func(tx *bolt.Tx) error { return fn(&shardtx{tx}) })
}

// mustView executes a function in the context of a read-only transaction.
// Panics if system error occurs. Return error from the fn for validation errors.
func (s *Shard) mustView(fn func(*shardtx) error) (err error) {
if e := s.view(func(tx *shardtx) error {
err = fn(tx)
return nil
}); e != nil {
panic("shard view: " + e.Error())
}
return
}

// Index returns the highest Raft index processed by this shard. Shard RLock
// held during execution.
func (s *Shard) Index() uint64 {
Expand Down

0 comments on commit e75e6a9

Please sign in to comment.