From d71785477f3d528ab02d36e4537945afb1447094 Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 21 Apr 2015 11:17:34 -0600 Subject: [PATCH] Make drop database close and release resources Drop database did not close any open shard files or close any topic reader/heartbeats. In the tests, we create and drop new databases during each test run so these were open files and connection slowed things down and consumed a lot of RAM as the tests progressed. --- client/influxdb.go | 2 +- messaging/client.go | 37 +++++++++++++++++++++++++++++++------ server.go | 31 +++++++++++++++++++++++++++++++ test/messaging.go | 18 ++++++++++++++++++ 4 files changed, 81 insertions(+), 7 deletions(-) diff --git a/client/influxdb.go b/client/influxdb.go index 43ae9e64880..b7e2605db4c 100644 --- a/client/influxdb.go +++ b/client/influxdb.go @@ -45,7 +45,7 @@ func NewClient(c Config) (*Client, error) { url: c.URL, username: c.Username, password: c.Password, - httpClient: &http.Client{}, + httpClient: http.DefaultClient, userAgent: c.UserAgent, } if client.userAgent == "" { diff --git a/messaging/client.go b/messaging/client.go index edfcb7e3c07..904650e6e63 100644 --- a/messaging/client.go +++ b/messaging/client.go @@ -34,11 +34,11 @@ const ( // Client represents a client for the broker's HTTP API. type Client struct { mu sync.Mutex - path string // config file path - conns []*Conn // all connections opened by client - url url.URL // current known leader URL - urls []url.URL // list of available broker URLs - dataURL url.URL // URL of the client's data node + path string // config file path + conns map[uint64]*Conn // all connections opened by client + url url.URL // current known leader URL + urls []url.URL // list of available broker URLs + dataURL url.URL // URL of the client's data node opened bool @@ -61,6 +61,7 @@ func NewClient(dataURL url.URL) *Client { ReconnectTimeout: DefaultReconnectTimeout, PingInterval: DefaultPingInterval, dataURL: dataURL, + conns: map[uint64]*Conn{}, } return c } @@ -353,12 +354,32 @@ func (c *Client) Conn(topicID uint64) *Conn { conn := NewConn(topicID, &c.dataURL) conn.SetURL(c.url) + if _, ok := c.conns[topicID]; ok { + panic(fmt.Sprintf("connection for topic %d already exists", topicID)) + } // Add to list of client connections. - c.conns = append(c.conns, conn) + c.conns[topicID] = conn return conn } +// CloseConn closes the connection to the broker for a given topic +func (c *Client) CloseConn(topicID uint64) error { + c.mu.Lock() + defer c.mu.Unlock() + + if conn, ok := c.conns[topicID]; ok && conn != nil { + if err := conn.Close(); err != nil { + return err + } + + // Add to list of client connections. + delete(c.conns, topicID) + } + + return nil +} + // pinger periodically pings the broker to check that it is alive. func (c *Client) pinger(closing chan struct{}) { defer c.wg.Done() @@ -439,6 +460,10 @@ type Conn struct { // NewConn returns a new connection to the broker for a topic. func NewConn(topicID uint64, dataURL *url.URL) *Conn { + warn("new connection", topicID, dataURL.String()) + //if topicID == 4 { + // panic("foo") + //} return &Conn{ topicID: topicID, dataURL: *dataURL, diff --git a/server.go b/server.go index 16fcb05a24c..b26918b9d70 100644 --- a/server.go +++ b/server.go @@ -1015,8 +1015,38 @@ func (s *Server) applyDropDatabase(m *messaging.Message) (err error) { // Remove from metastore. err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { return tx.dropDatabase(c.Name) }) + db := s.databases[c.Name] + for _, rp := range db.policies { + for _, sg := range rp.shardGroups { + for _, sh := range sg.Shards { + + // if we have this shard locally, close and remove it + if sh.store != nil { + // close topic readers/heartbeaters/etc. connections + err := s.client.CloseConn(sh.ID) + if err != nil { + panic(err) + } + + err = sh.close() + if err != nil { + panic(err) + } + + err = os.Remove(s.shardPath(sh.ID)) + if err != nil { + panic(err) + } + } + + delete(s.shards, sh.ID) + } + } + } + // Delete the database entry. delete(s.databases, c.Name) + return } @@ -3518,6 +3548,7 @@ type MessagingClient interface { // Conn returns an open, streaming connection to a topic. Conn(topicID uint64) MessagingConn + CloseConn(topicID uint64) error } type messagingClient struct { diff --git a/test/messaging.go b/test/messaging.go index 8a9e7742f79..7633704db34 100644 --- a/test/messaging.go +++ b/test/messaging.go @@ -90,6 +90,24 @@ func (c *MessagingClient) Conn(topicID uint64) influxdb.MessagingConn { return c.ConnFunc(topicID) } +func (c *MessagingClient) CloseConn(topicID uint64) error { + c.mu.Lock() + defer c.mu.Unlock() + + conns := []*MessagingConn{} + for _, conn := range c.conns { + if conn.topicID == topicID { + if err := conn.Close(); err != nil { + return err + } + continue + } + conns = append(conns, conn) + } + c.conns = conns + return nil +} + // DefaultConnFunc returns a connection for a specific topic. func (c *MessagingClient) DefaultConnFunc(topicID uint64) influxdb.MessagingConn { c.mu.Lock()