Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Graphite TCP should not block system shutdown #4222

Merged
merged 1 commit into from
Sep 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
- [#4165](https://github.com/influxdb/influxdb/pull/4165): Tag all Go runtime stats when writing to internal database.
- [#4118](https://github.com/influxdb/influxdb/issues/4118): Return consistent, correct result for SHOW MEASUREMENTS with multiple AND conditions
- [#4191](https://github.com/influxdb/influxdb/pull/4191): Correctly marshal remote mapper responses. Fixes [#4170](https://github.com/influxdb/influxdb/issues/4170)
- [#4222](https://github.com/influxdb/influxdb/pull/4222): Graphite TCP connections should not block shutdown
- [#4180](https://github.com/influxdb/influxdb/pull/4180): Cursor & SelectMapper Refactor
- [#1577](https://github.com/influxdb/influxdb/issues/1577): selectors (e.g. min, max, first, last) should have equivalents to return the actual point

Expand Down
10 changes: 8 additions & 2 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,18 @@ func (m *Monitor) SetLogger(l *log.Logger) {
}

// RegisterDiagnosticsClient registers a diagnostics client with the given name and tags.
func (m *Monitor) RegisterDiagnosticsClient(name string, client DiagsClient) error {
func (m *Monitor) RegisterDiagnosticsClient(name string, client DiagsClient) {
m.mu.Lock()
defer m.mu.Unlock()
m.diagRegistrations[name] = client
m.Logger.Printf(`'%s' registered for diagnostics monitoring`, name)
return nil
}

// DeregisterDiagnosticsClient deregisters a diagnostics client by name.
func (m *Monitor) DeregisterDiagnosticsClient(name string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.diagRegistrations, name)
}

// Statistics returns the combined statistics for all expvar data. The given
Expand Down
134 changes: 68 additions & 66 deletions services/graphite/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,6 @@ const (
leaderWaitTimeout = 30 * time.Second
)

// Initialize the graphite stats and diags
func init() {
tcpConnections = make(map[string]*tcpConnectionDiag)
}

// Package-level tracking of connections for diagnostics.
var monitorOnce sync.Once

type tcpConnectionDiag struct {
conn net.Conn
connectTime time.Time
}

var tcpConnectionsMu sync.Mutex
var tcpConnections map[string]*tcpConnectionDiag

func addConnection(c net.Conn) {
tcpConnectionsMu.Lock()
defer tcpConnectionsMu.Unlock()
tcpConnections[c.RemoteAddr().String()] = &tcpConnectionDiag{
conn: c,
connectTime: time.Now().UTC(),
}
}
func removeConnection(c net.Conn) {
tcpConnectionsMu.Lock()
defer tcpConnectionsMu.Unlock()
delete(tcpConnections, c.RemoteAddr().String())
}

func handleDiagnostics() (*monitor.Diagnostic, error) {
tcpConnectionsMu.Lock()
defer tcpConnectionsMu.Unlock()

d := &monitor.Diagnostic{
Columns: []string{"local", "remote", "connect time"},
Rows: make([][]interface{}, 0, len(tcpConnections)),
}
for _, v := range tcpConnections {
_ = v
d.Rows = append(d.Rows, []interface{}{v.conn.LocalAddr().String(), v.conn.RemoteAddr().String(), v.connectTime})
}
return d, nil
}

// statistics gathered by the graphite package.
const (
statPointsReceived = "points_rx"
Expand All @@ -82,6 +37,15 @@ const (
statConnectionsHandled = "connections_handled"
)

type tcpConnection struct {
conn net.Conn
connectTime time.Time
}

func (c *tcpConnection) Close() {
c.conn.Close()
}

type Service struct {
bindAddress string
database string
Expand All @@ -94,8 +58,10 @@ type Service struct {
batcher *tsdb.PointBatcher
parser *Parser

logger *log.Logger
statMap *expvar.Map
logger *log.Logger
statMap *expvar.Map
tcpConnectionsMu sync.Mutex
tcpConnections map[string]*tcpConnection

ln net.Listener
addr net.Addr
Expand All @@ -105,7 +71,8 @@ type Service struct {
done chan struct{}

Monitor interface {
RegisterDiagnosticsClient(name string, client monitor.DiagsClient) error
RegisterDiagnosticsClient(name string, client monitor.DiagsClient)
DeregisterDiagnosticsClient(name string)
}
PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
Expand All @@ -122,14 +89,15 @@ func NewService(c Config) (*Service, error) {
d := c.WithDefaults()

s := Service{
bindAddress: d.BindAddress,
database: d.Database,
protocol: d.Protocol,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
done: make(chan struct{}),
bindAddress: d.BindAddress,
database: d.Database,
protocol: d.Protocol,
batchSize: d.BatchSize,
batchPending: d.BatchPending,
batchTimeout: time.Duration(d.BatchTimeout),
logger: log.New(os.Stderr, "[graphite] ", log.LstdFlags),
tcpConnections: make(map[string]*tcpConnection),
done: make(chan struct{}),
}

consistencyLevel, err := cluster.ParseConsistencyLevel(d.ConsistencyLevel)
Expand Down Expand Up @@ -161,14 +129,10 @@ func (s *Service) Open() error {
tags := map[string]string{"proto": s.protocol, "bind": s.bindAddress}
s.statMap = influxdb.NewStatistics(key, "graphite", tags)

// One Graphite service hooks up diagnostics for all Graphite functionality.
monitorOnce.Do(func() {
if s.Monitor == nil {
s.logger.Println("no monitor service available, no monitoring will be performed")
return
}
s.Monitor.RegisterDiagnosticsClient("graphite", monitor.DiagsClientFunc(handleDiagnostics))
})
// Register diagnostics if a Monitor service is available.
if s.Monitor != nil {
s.Monitor.RegisterDiagnosticsClient(key, s)
}

if err := s.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
s.logger.Printf("Failed to detect a cluster leader: %s", err.Error())
Expand Down Expand Up @@ -202,9 +166,18 @@ func (s *Service) Open() error {
s.logger.Printf("Listening on %s: %s", strings.ToUpper(s.protocol), s.addr.String())
return nil
}
func (s *Service) closeAllConnections() {
s.tcpConnectionsMu.Lock()
defer s.tcpConnectionsMu.Unlock()
for _, c := range s.tcpConnections {
c.Close()
}
}

// Close stops all data processing on the Graphite input.
func (s *Service) Close() error {
s.closeAllConnections()

if s.ln != nil {
s.ln.Close()
}
Expand Down Expand Up @@ -262,11 +235,11 @@ func (s *Service) openTCPServer() (net.Addr, error) {
func (s *Service) handleTCPConnection(conn net.Conn) {
defer s.wg.Done()
defer conn.Close()
defer removeConnection(conn)
defer s.statMap.Add(statConnectionsActive, -1)
addConnection(conn)
defer s.untrackConnection(conn)
s.statMap.Add(statConnectionsActive, 1)
s.statMap.Add(statConnectionsHandled, 1)
s.trackConnection(conn)

reader := bufio.NewReader(conn)

Expand All @@ -286,6 +259,20 @@ func (s *Service) handleTCPConnection(conn net.Conn) {
}
}

func (s *Service) trackConnection(c net.Conn) {
s.tcpConnectionsMu.Lock()
defer s.tcpConnectionsMu.Unlock()
s.tcpConnections[c.RemoteAddr().String()] = &tcpConnection{
conn: c,
connectTime: time.Now().UTC(),
}
}
func (s *Service) untrackConnection(c net.Conn) {
s.tcpConnectionsMu.Lock()
defer s.tcpConnectionsMu.Unlock()
delete(s.tcpConnections, c.RemoteAddr().String())
}

// openUDPServer opens the Graphite input in UDP mode and starts processing incoming data.
func (s *Service) openUDPServer() (net.Addr, error) {
addr, err := net.ResolveUDPAddr("udp", s.bindAddress)
Expand Down Expand Up @@ -370,3 +357,18 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) {
}
}
}

func (s *Service) Diagnostics() (*monitor.Diagnostic, error) {
s.tcpConnectionsMu.Lock()
defer s.tcpConnectionsMu.Unlock()

d := &monitor.Diagnostic{
Columns: []string{"local", "remote", "connect time"},
Rows: make([][]interface{}, 0, len(s.tcpConnections)),
}
for _, v := range s.tcpConnections {
_ = v
d.Rows = append(d.Rows, []interface{}{v.conn.LocalAddr().String(), v.conn.RemoteAddr().String(), v.connectTime})
}
return d, nil
}