Skip to content

Commit

Permalink
Merge pull request #2353 from influxdb/2272-fix
Browse files Browse the repository at this point in the history
Distributed Query/Clustering Fixes
  • Loading branch information
jwilder committed Apr 21, 2015
2 parents e3fcdd2 + 2975a9d commit fbf9cdb
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 33 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Features
- [#2301](https://github.com/influxdb/influxdb/pull/2301): Distributed query load balancing and failover
- [#2336](https://github.com/influxdb/influxdb/pull/2336): Handle distributed queries when shards != data nodes
- [#2353](https://github.com/influxdb/influxdb/pull/2353): Distributed Query/Clustering Fixes

### Bugfixes
- [#2297](https://github.com/influxdb/influxdb/pull/2297): create /var/run during startup. Thanks @neonstalwart.
Expand All @@ -18,6 +19,10 @@
- [#2338](https://github.com/influxdb/influxdb/pull/2338): Fix panic if tag key isn't double quoted when it should have been
- [#2340](https://github.com/influxdb/influxdb/pull/2340): Fix SHOW DIAGNOSTICS panic if any shard was non-local.
- [#2351](https://github.com/influxdb/influxdb/pull/2351): Fix data race by rlocking shard during diagnostics.
- [#2348](https://github.com/influxdb/influxdb/pull/2348): Data node fail to join cluster in 0.9.0rc25
- [#2343](https://github.com/influxdb/influxdb/pull/2343): Node falls behind Metastore updates
- [#2334](https://github.com/influxdb/influxdb/pull/2334): Test Partial replication is very problematic
- [#2272](https://github.com/influxdb/influxdb/pull/2272): clustering: influxdb 0.9.0-rc23 panics when doing a GET with merge_metrics in a

## v0.9.0-rc25 [2015-04-15]

Expand Down
2 changes: 1 addition & 1 deletion client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewClient(c Config) (*Client, error) {
url: c.URL,
username: c.Username,
password: c.Password,
httpClient: &http.Client{},
httpClient: http.DefaultClient,
userAgent: c.UserAgent,
}
if client.userAgent == "" {
Expand Down
32 changes: 15 additions & 17 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,30 +642,18 @@ func (cmd *RunCommand) openServer(joinURLs []url.URL) *influxdb.Server {
// Give brokers time to elect a leader if entire cluster is being restarted.
time.Sleep(1 * time.Second)

if s.ID() == 0 && s.Index() == 0 {
if len(joinURLs) > 0 {
joinServer(s, *cmd.node.ClusterURL(), joinURLs)
return s
}

if err := s.Initialize(*cmd.node.ClusterURL()); err != nil {
log.Fatalf("server initialization error(0): %s", err)
}

u := cmd.node.ClusterURL()
log.Printf("initialized data node: %s\n", u.String())
return s
if s.ID() == 0 {
joinOrInitializeServer(s, *cmd.node.ClusterURL(), joinURLs)
} else {
log.Printf("data node already member of cluster. Using existing state and ignoring join URLs")
}

return s
}

// joins a server to an existing cluster.
func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
// TODO: Use separate broker and data join urls.

// joinOrInitializeServer joins a new server to an existing cluster or initializes it as the first
// member of the cluster
func joinOrInitializeServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
// Create data node on an existing data node.
for _, joinURL := range joinURLs {
if err := s.Join(&u, &joinURL); err == influxdb.ErrDataNodeNotFound {
Expand All @@ -676,12 +664,22 @@ func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
log.Printf("initialized data node: %s\n", (&u).String())
return
} else if err != nil {
// does not return so that the next joinURL can be tried
log.Printf("join: failed to connect data node: %s: %s", (&u).String(), err)
} else {
log.Printf("join: connected data node to %s", u)
return
}
}

if len(joinURLs) == 0 {
if err := s.Initialize(u); err != nil {
log.Fatalf("server initialization error(2): %s", err)
}
log.Printf("initialized data node: %s\n", (&u).String())
return
}

log.Fatalf("join: failed to connect data node to any specified server")
}

Expand Down
11 changes: 6 additions & 5 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1458,9 +1458,10 @@ func Test3NodeServerFailover(t *testing.T) {
}

// ensure that all queries work if there are more nodes in a cluster than the replication factor
func Test3NodeClusterPartiallyReplicated(t *testing.T) {
// and there is more than 1 shards
func Test5NodeClusterPartiallyReplicated(t *testing.T) {
t.Parallel()
testName := "3-node server integration partial replication"
testName := "5-node server integration partial replication"
if testing.Short() {
t.Skip(fmt.Sprintf("skipping '%s'", testName))
}
Expand All @@ -1469,11 +1470,11 @@ func Test3NodeClusterPartiallyReplicated(t *testing.T) {
os.RemoveAll(dir)
}()

nodes := createCombinedNodeCluster(t, testName, dir, 3, nil)
nodes := createCombinedNodeCluster(t, testName, dir, 5, nil)
defer nodes.Close()

runTestsData(t, testName, nodes, "mydb", "myrp", len(nodes)-1)
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", len(nodes)-1)
runTestsData(t, testName, nodes, "mydb", "myrp", 2)
runTest_rawDataReturnsInOrder(t, testName, nodes, "mydb", "myrp", 2)
}

func TestClientLibrary(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion influxql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,11 @@ func (m *MapReduceJob) processRawQuery(out chan *Row, filterEmptyResults bool) {
// add up to the index to the values
values = append(values, o[:ind]...)

// clear out previously sent mapper output data
mapperOutputs[j] = mapperOutputs[j][ind:]

// if we emptied out all the values, set this output to nil so that the mapper will get run again on the next loop
if ind == len(o) {
if len(mapperOutputs[j]) == 0 {
mapperOutputs[j] = nil
}
}
Expand Down
32 changes: 26 additions & 6 deletions messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ const (
// Client represents a client for the broker's HTTP API.
type Client struct {
mu sync.Mutex
path string // config file path
conns []*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
dataURL url.URL // URL of the client's data node
path string // config file path
conns map[uint64]*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
dataURL url.URL // URL of the client's data node

opened bool

Expand All @@ -61,6 +61,7 @@ func NewClient(dataURL url.URL) *Client {
ReconnectTimeout: DefaultReconnectTimeout,
PingInterval: DefaultPingInterval,
dataURL: dataURL,
conns: map[uint64]*Conn{},
}
return c
}
Expand Down Expand Up @@ -353,12 +354,31 @@ func (c *Client) Conn(topicID uint64) *Conn {
conn := NewConn(topicID, &c.dataURL)
conn.SetURL(c.url)

if _, ok := c.conns[topicID]; ok {
panic(fmt.Sprintf("connection for topic %d already exists", topicID))
}
// Add to list of client connections.
c.conns = append(c.conns, conn)
c.conns[topicID] = conn

return conn
}

// CloseConn closes the connection to the broker for a given topic
func (c *Client) CloseConn(topicID uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

if conn, ok := c.conns[topicID]; ok && conn != nil {
if err := conn.Close(); err != nil {
return err
}

delete(c.conns, topicID)
}

return nil
}

// pinger periodically pings the broker to check that it is alive.
func (c *Client) pinger(closing chan struct{}) {
defer c.wg.Done()
Expand Down
32 changes: 32 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,38 @@ func (s *Server) applyDropDatabase(m *messaging.Message) (err error) {
// Remove from metastore.
err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { return tx.dropDatabase(c.Name) })

db := s.databases[c.Name]
for _, rp := range db.policies {
for _, sg := range rp.shardGroups {
for _, sh := range sg.Shards {

// if we have this shard locally, close and remove it
if sh.store != nil {
// close topic readers/heartbeaters/etc. connections
err := s.client.CloseConn(sh.ID)
if err != nil {
panic(err)
}

err = sh.close()
if err != nil {
panic(err)
}

err = os.Remove(s.shardPath(sh.ID))
if err != nil {
panic(err)
}
}

delete(s.shards, sh.ID)
}
}
}

// Delete the database entry.
delete(s.databases, c.Name)

return
}

Expand Down Expand Up @@ -3157,6 +3187,7 @@ func (s *Server) StartLocalMapper(rm *RemoteMapper) (*LocalMapper, error) {
selectFields: rm.SelectFields,
selectTags: rm.SelectTags,
interval: rm.Interval,
tmin: rm.TMin,
tmax: rm.TMax,
limit: limit,
}
Expand Down Expand Up @@ -3517,6 +3548,7 @@ type MessagingClient interface {

// Conn returns an open, streaming connection to a topic.
Conn(topicID uint64) MessagingConn
CloseConn(topicID uint64) error
}

type messagingClient struct {
Expand Down
18 changes: 18 additions & 0 deletions test/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ func (c *MessagingClient) Conn(topicID uint64) influxdb.MessagingConn {
return c.ConnFunc(topicID)
}

func (c *MessagingClient) CloseConn(topicID uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

conns := []*MessagingConn{}
for _, conn := range c.conns {
if conn.topicID == topicID {
if err := conn.Close(); err != nil {
return err
}
continue
}
conns = append(conns, conn)
}
c.conns = conns
return nil
}

// DefaultConnFunc returns a connection for a specific topic.
func (c *MessagingClient) DefaultConnFunc(topicID uint64) influxdb.MessagingConn {
c.mu.Lock()
Expand Down
7 changes: 4 additions & 3 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
shards[shard] = append(shards[shard], sid)
}

for shard, _ := range shards {
for shard, sids := range shards {
var mapper influxql.Mapper

// create either a remote or local mapper for this shard
Expand All @@ -167,7 +167,7 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
MeasurementName: m.Name,
TMin: tmin.UnixNano(),
TMax: tmax.UnixNano(),
SeriesIDs: t.SeriesIDs,
SeriesIDs: sids,
ShardID: shard.ID,
WhereFields: whereFields,
SelectFields: selectFields,
Expand All @@ -179,14 +179,15 @@ func (tx *tx) CreateMapReduceJobs(stmt *influxql.SelectStatement, tagKeys []stri
mapper.(*RemoteMapper).SetFilters(t.Filters)
} else {
mapper = &LocalMapper{
seriesIDs: t.SeriesIDs,
seriesIDs: sids,
db: shard.store,
job: job,
decoder: NewFieldCodec(m),
filters: t.Filters,
whereFields: whereFields,
selectFields: selectFields,
selectTags: selectTags,
tmin: tmin.UnixNano(),
tmax: tmax.UnixNano(),
interval: interval,
// multiple mappers may need to be merged together to get the results
Expand Down

0 comments on commit fbf9cdb

Please sign in to comment.