Skip to content

Commit

Permalink
Make drop database close and release resources
Browse files Browse the repository at this point in the history
Drop database did not close any open shard files or close
any topic reader/heartbeats.  In the tests, we create and drop new
databases during each test run so these were open files and connection
slowed things down and consumed a lot of RAM as the tests progressed.
  • Loading branch information
jwilder committed Apr 21, 2015
1 parent da7211a commit d717854
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 7 deletions.
2 changes: 1 addition & 1 deletion client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func NewClient(c Config) (*Client, error) {
url: c.URL,
username: c.Username,
password: c.Password,
httpClient: &http.Client{},
httpClient: http.DefaultClient,
userAgent: c.UserAgent,
}
if client.userAgent == "" {
Expand Down
37 changes: 31 additions & 6 deletions messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ const (
// Client represents a client for the broker's HTTP API.
type Client struct {
mu sync.Mutex
path string // config file path
conns []*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
dataURL url.URL // URL of the client's data node
path string // config file path
conns map[uint64]*Conn // all connections opened by client
url url.URL // current known leader URL
urls []url.URL // list of available broker URLs
dataURL url.URL // URL of the client's data node

opened bool

Expand All @@ -61,6 +61,7 @@ func NewClient(dataURL url.URL) *Client {
ReconnectTimeout: DefaultReconnectTimeout,
PingInterval: DefaultPingInterval,
dataURL: dataURL,
conns: map[uint64]*Conn{},
}
return c
}
Expand Down Expand Up @@ -353,12 +354,32 @@ func (c *Client) Conn(topicID uint64) *Conn {
conn := NewConn(topicID, &c.dataURL)
conn.SetURL(c.url)

if _, ok := c.conns[topicID]; ok {
panic(fmt.Sprintf("connection for topic %d already exists", topicID))
}
// Add to list of client connections.
c.conns = append(c.conns, conn)
c.conns[topicID] = conn

return conn
}

// CloseConn closes the connection to the broker for a given topic
func (c *Client) CloseConn(topicID uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

if conn, ok := c.conns[topicID]; ok && conn != nil {
if err := conn.Close(); err != nil {
return err
}

// Add to list of client connections.
delete(c.conns, topicID)
}

return nil
}

// pinger periodically pings the broker to check that it is alive.
func (c *Client) pinger(closing chan struct{}) {
defer c.wg.Done()
Expand Down Expand Up @@ -439,6 +460,10 @@ type Conn struct {

// NewConn returns a new connection to the broker for a topic.
func NewConn(topicID uint64, dataURL *url.URL) *Conn {
warn("new connection", topicID, dataURL.String())
//if topicID == 4 {
// panic("foo")
//}
return &Conn{
topicID: topicID,
dataURL: *dataURL,
Expand Down
31 changes: 31 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1015,8 +1015,38 @@ func (s *Server) applyDropDatabase(m *messaging.Message) (err error) {
// Remove from metastore.
err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { return tx.dropDatabase(c.Name) })

db := s.databases[c.Name]
for _, rp := range db.policies {
for _, sg := range rp.shardGroups {
for _, sh := range sg.Shards {

// if we have this shard locally, close and remove it
if sh.store != nil {
// close topic readers/heartbeaters/etc. connections
err := s.client.CloseConn(sh.ID)
if err != nil {
panic(err)
}

err = sh.close()
if err != nil {
panic(err)
}

err = os.Remove(s.shardPath(sh.ID))
if err != nil {
panic(err)
}
}

delete(s.shards, sh.ID)
}
}
}

// Delete the database entry.
delete(s.databases, c.Name)

return
}

Expand Down Expand Up @@ -3518,6 +3548,7 @@ type MessagingClient interface {

// Conn returns an open, streaming connection to a topic.
Conn(topicID uint64) MessagingConn
CloseConn(topicID uint64) error
}

type messagingClient struct {
Expand Down
18 changes: 18 additions & 0 deletions test/messaging.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,24 @@ func (c *MessagingClient) Conn(topicID uint64) influxdb.MessagingConn {
return c.ConnFunc(topicID)
}

func (c *MessagingClient) CloseConn(topicID uint64) error {
c.mu.Lock()
defer c.mu.Unlock()

conns := []*MessagingConn{}
for _, conn := range c.conns {
if conn.topicID == topicID {
if err := conn.Close(); err != nil {
return err
}
continue
}
conns = append(conns, conn)
}
c.conns = conns
return nil
}

// DefaultConnFunc returns a connection for a specific topic.
func (c *MessagingClient) DefaultConnFunc(topicID uint64) influxdb.MessagingConn {
c.mu.Lock()
Expand Down

0 comments on commit d717854

Please sign in to comment.