Skip to content

Commit

Permalink
Merge pull request #5753 from influxdata/er-meta-queries
Browse files Browse the repository at this point in the history
Support mutable meta queries in a cluster
  • Loading branch information
benbjohnson committed Feb 24, 2016
2 parents 3cdb4c1 + b306476 commit 23ef15b
Show file tree
Hide file tree
Showing 15 changed files with 557 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
- [#5719](https://github.com/influxdata/influxdb/issues/5719): Fix cache not deduplicating points
- [#5754](https://github.com/influxdata/influxdb/issues/5754): Adding a node as meta only results in a data node also being registered
- [#5787](https://github.com/influxdata/influxdb/pull/5787): HTTP: Add QueryAuthorizer instance to httpd service’s handler. @chris-ramon
- [#5753](https://github.com/influxdata/influxdb/pull/5753): ensures that drop-type commands work correctly in a cluster

## v0.10.1 [2016-02-18]

Expand Down
59 changes: 59 additions & 0 deletions cluster/internal/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions cluster/internal/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,13 @@ message MapShardResponse {
repeated string TagSets = 4;
repeated string Fields = 5;
}

message ExecuteStatementRequest {
required string Statement = 1;
required string Database = 2;
}

message ExecuteStatementResponse {
required int32 Code = 1;
optional string Message = 2;
}
171 changes: 171 additions & 0 deletions cluster/meta_executor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package cluster

import (
"fmt"
"log"
"net"
"os"
"sync"
"time"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
)

const (
metaExecutorWriteTimeout = 5 * time.Second
metaExecutorMaxWriteConnections = 10
)

// MetaExecutor executes meta queries on all data nodes.
type MetaExecutor struct {
mu sync.RWMutex
timeout time.Duration
pool *clientPool
maxConnections int
Logger *log.Logger
Node *influxdb.Node

nodeExecutor interface {
executeOnNode(stmt influxql.Statement, database string, node *meta.NodeInfo) error
}

MetaClient interface {
DataNode(id uint64) (ni *meta.NodeInfo, err error)
DataNodes() ([]meta.NodeInfo, error)
}
}

// NewMetaExecutor returns a new initialized *MetaExecutor.
func NewMetaExecutor() *MetaExecutor {
m := &MetaExecutor{
timeout: metaExecutorWriteTimeout,
pool: newClientPool(),
maxConnections: metaExecutorMaxWriteConnections,
Logger: log.New(os.Stderr, "[meta-executor] ", log.LstdFlags),
}
m.nodeExecutor = m

return m
}

// remoteNodeError wraps an error with context about a node that
// returned the error.
type remoteNodeError struct {
id uint64
err error
}

func (e remoteNodeError) Error() string {
return fmt.Sprintf("partial success, node %d may be down (%s)", e.id, e.err)
}

// ExecuteStatement executes a single InfluxQL statement on all nodes in the cluster concurrently.
func (m *MetaExecutor) ExecuteStatement(stmt influxql.Statement, database string) error {
// Get a list of all nodes the query needs to be executed on.
nodes, err := m.MetaClient.DataNodes()
if err != nil {
return err
} else if len(nodes) < 1 {
return nil
}

// Start a goroutine to execute the statement on each of the remote nodes.
var wg sync.WaitGroup
errs := make(chan error, len(nodes)-1)
for _, node := range nodes {
if m.Node.ID == node.ID {
continue // Don't execute statement on ourselves.
}

wg.Add(1)
go func(node meta.NodeInfo) {
defer wg.Done()
if err := m.nodeExecutor.executeOnNode(stmt, database, &node); err != nil {
errs <- remoteNodeError{id: node.ID, err: err}
}
}(node)
}

// Wait on n-1 nodes to execute the statement and respond.
wg.Wait()

select {
case err = <-errs:
return err
default:
return nil
}
}

// executeOnNode executes a single InfluxQL statement on a single node.
func (m *MetaExecutor) executeOnNode(stmt influxql.Statement, database string, node *meta.NodeInfo) error {
// We're executing on a remote node so establish a connection.
c, err := m.dial(node.ID)
if err != nil {
return err
}

conn, ok := c.(*pooledConn)
if !ok {
panic("wrong connection type in MetaExecutor")
}
// Return connection to pool by "closing" it.
defer conn.Close()

// Build RPC request.
var request ExecuteStatementRequest
request.SetStatement(stmt.String())
request.SetDatabase(database)

// Marshal into protocol buffer.
buf, err := request.MarshalBinary()
if err != nil {
return err
}

// Send request.
conn.SetWriteDeadline(time.Now().Add(m.timeout))
if err := WriteTLV(conn, executeStatementRequestMessage, buf); err != nil {
conn.MarkUnusable()
return err
}

// Read the response.
conn.SetReadDeadline(time.Now().Add(m.timeout))
_, buf, err = ReadTLV(conn)
if err != nil {
conn.MarkUnusable()
return err
}

// Unmarshal response.
var response ExecuteStatementResponse
if err := response.UnmarshalBinary(buf); err != nil {
return err
}

if response.Code() != 0 {
return fmt.Errorf("error code %d: %s", response.Code(), response.Message())
}

return nil
}

// dial returns a connection to a single node in the cluster.
func (m *MetaExecutor) dial(nodeID uint64) (net.Conn, error) {
// If we don't have a connection pool for that addr yet, create one
_, ok := m.pool.getPool(nodeID)
if !ok {
factory := &connFactory{nodeID: nodeID, clientPool: m.pool, timeout: m.timeout}
factory.metaClient = m.MetaClient

p, err := NewBoundedPool(1, m.maxConnections, m.timeout, factory.dial)
if err != nil {
return nil, err
}
m.pool.setPool(nodeID, p)
}
return m.pool.conn(nodeID)
}
121 changes: 121 additions & 0 deletions cluster/meta_executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package cluster

import (
"fmt"
"sync"
"testing"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/services/meta"
)

func Test_ExecuteStatement(t *testing.T) {
numOfNodes := 3

mock := newMockExecutor()
// Expect each statement twice because we have 3 nodes, 2 of which
// are remote and should be executed on.
mock.expect("DROP RETENTION POLICY rp0 on foo")
mock.expect("DROP RETENTION POLICY rp0 on foo")
mock.expect("DROP DATABASE foo")
mock.expect("DROP DATABASE foo")

e := NewMetaExecutor()
e.MetaClient = newMockMetaClient(numOfNodes)
e.Node = influxdb.NewNode("/tmp/node")
e.Node.ID = 1
// Replace MetaExecutor's nodeExecutor with our mock.
e.nodeExecutor = mock

if err := e.ExecuteStatement(mustParseStatement("DROP RETENTION POLICY rp0 on foo"), "foo"); err != nil {
t.Fatal(err)
}
if err := e.ExecuteStatement(mustParseStatement("DROP DATABASE foo"), "foo"); err != nil {
t.Fatal(err)
}

if err := mock.done(); err != nil {
t.Fatal(err)
}
}

type mockExecutor struct {
mu sync.Mutex
expectStatements []influxql.Statement
idx int
}

func newMockExecutor() *mockExecutor {
return &mockExecutor{
idx: -1,
}
}

func (e *mockExecutor) expect(stmt string) {
s := mustParseStatement(stmt)
e.expectStatements = append(e.expectStatements, s)
}

func (e *mockExecutor) done() error {
if e.idx+1 != len(e.expectStatements) {
return fmt.Errorf("expected %d mockExecuteOnNode calls, got %d", len(e.expectStatements), e.idx+1)
}
return nil
}

func (e *mockExecutor) executeOnNode(stmt influxql.Statement, database string, node *meta.NodeInfo) error {
e.mu.Lock()
defer e.mu.Unlock()

e.idx++

if e.idx > len(e.expectStatements)-1 {
return fmt.Errorf("extra statement: %s", stmt.String())
}

if e.expectStatements[e.idx].String() != stmt.String() {
return fmt.Errorf("unexpected statement:\n\texp: %s\n\tgot: %s\n", e.expectStatements[e.idx].String(), stmt.String())
}
return nil
}

func mustParseStatement(stmt string) influxql.Statement {
s, err := influxql.ParseStatement(stmt)
if err != nil {
panic(err)
}
return s
}

type mockMetaClient struct {
nodes []meta.NodeInfo
}

func newMockMetaClient(nodeCnt int) *mockMetaClient {
c := &mockMetaClient{}
for i := 0; i < nodeCnt; i++ {
n := meta.NodeInfo{
ID: uint64(i + 1),
Host: fmt.Sprintf("localhost:%d", 8000+i),
TCPHost: fmt.Sprintf("localhost:%d", 9000+i),
}
c.nodes = append(c.nodes, n)
}

return c
}

func (c *mockMetaClient) DataNode(id uint64) (ni *meta.NodeInfo, err error) {
for i := 0; i < len(c.nodes); i++ {
if c.nodes[i].ID == id {
ni = &c.nodes[i]
return
}
}
return
}

func (c *mockMetaClient) DataNodes() ([]meta.NodeInfo, error) {
return c.nodes, nil
}
Loading

0 comments on commit 23ef15b

Please sign in to comment.