Skip to content

Commit

Permalink
Add UDP service back
Browse files Browse the repository at this point in the history
  • Loading branch information
renan-strauss committed Jun 3, 2015
1 parent 9944678 commit f0dd608
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 2 deletions.
2 changes: 2 additions & 0 deletions cmd/influxd/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/monitor"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tsdb"
)

Expand Down Expand Up @@ -77,6 +78,7 @@ type Config struct {
Graphites []graphite.Config `toml:"graphite"`
Collectd collectd.Config `toml:"collectd"`
OpenTSDB opentsdb.Config `toml:"opentsdb"`
UDP udp.Config `toml:"udp"`

// Snapshot SnapshotConfig `toml:"snapshot"`
Monitoring monitor.Config `toml:"monitoring"`
Expand Down
5 changes: 5 additions & 0 deletions cmd/influxd/run/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ bind-address = ":1000"
[opentsdb]
bind-address = ":2000"
[udp]
bind-address = ":4444"
[monitoring]
enabled = true
Expand Down Expand Up @@ -81,6 +84,8 @@ enabled = true
t.Fatalf("unexpected collectd bind address: %s", c.Collectd.BindAddress)
} else if c.OpenTSDB.BindAddress != ":2000" {
t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDB.BindAddress)
} else if c.UDP.BindAddress != ":4444" {
t.Fatalf("unexpected udp bind address: %s", c.UDP.BindAddress)
} else if c.Monitoring.Enabled != true {
t.Fatalf("unexpected monitoring enabled: %v", c.Monitoring.Enabled)
} else if c.ContinuousQuery.Enabled != true {
Expand Down
8 changes: 8 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/influxdb/influxdb/services/graphite"
"github.com/influxdb/influxdb/services/httpd"
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tsdb"
)

Expand Down Expand Up @@ -55,6 +56,7 @@ func NewServer(c *Config, joinURLs string) *Server {
s.appendHTTPDService(c.HTTPD)
s.appendCollectdService(c.Collectd)
s.appendOpenTSDBService(c.OpenTSDB)
s.appendUDPService(c.UDP)
for _, g := range c.Graphites {
s.appendGraphiteService(g)
}
Expand Down Expand Up @@ -96,6 +98,12 @@ func (s *Server) appendGraphiteService(c graphite.Config) {
s.Services = append(s.Services, srv)
}

func (s *Server) appendUDPService(c udp.Config) {
srv := udp.NewService(c)
srv.Server.PointsWriter = s.PointsWriter
s.Services = append(s.Services, srv)
}

// Open opens the meta and data store and all services.
func (s *Server) Open() error {
if err := func() error {
Expand Down
3 changes: 3 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1010,6 +1010,9 @@ func NewConfig() *run.Config {
c.HTTPD.Enabled = true
c.HTTPD.BindAddress = "127.0.0.1:0"
c.HTTPD.LogEnabled = testing.Verbose()

c.UDP.BindAddress = "127.0.0.1:0"
c.UDP.Database = "db0"
return c
}

Expand Down
6 changes: 4 additions & 2 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,10 @@ enabled = false
# Configure UDP listener for series data.
[udp]
enabled = false
# bind-address = "0.0.0.0"
# port = 4444
# bind-address = ":4444"
# database = "udp_database"
# batch-size = 0 # How many points to batch up internally before writing.
# batch-timeout = "0ms" # Maximum time to wait before sending batch, regardless of current size.

# Broker configuration. Brokers are nodes which participate in distributed
# consensus.
Expand Down
12 changes: 12 additions & 0 deletions services/udp/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package udp

import "github.com/influxdb/influxdb/toml"

type Config struct {
Enabled bool `toml:"enabled"`
BindAddress string `toml:"bind-address"`

Database string `toml:"database"`
BatchSize int `toml:"batch-size"`
BatchTimeout toml.Duration `toml:"batch-timeout"`
}
36 changes: 36 additions & 0 deletions services/udp/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package udp_test

import (
"testing"
"time"

"github.com/BurntSushi/toml"
"github.com/influxdb/influxdb/services/udp"
)

func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c udp.Config
if _, err := toml.Decode(`
enabled = true
bind-address = ":4444"
database = "awesomedb"
batch-size = 100
batch-timeout = "10ms"
`, &c); err != nil {
t.Fatal(err)
}

// Validate configuration.
if c.Enabled != true {
t.Fatalf("unexpected enabled: %v", c.Enabled)
} else if c.BindAddress != ":4444" {
t.Fatalf("unexpected bind address: %s", c.BindAddress)
} else if c.Database != "awesomedb" {
t.Fatalf("unexpected database: %s", c.Database)
} else if c.BatchSize != 100 {
t.Fatalf("unexpected batch size: %d", c.BatchSize)
} else if time.Duration(c.BatchTimeout) != (10 * time.Millisecond) {
t.Fatalf("unexpected batch timeout: %v", c.BatchTimeout)
}
}
35 changes: 35 additions & 0 deletions services/udp/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package udp

import (
"net"
"time"
)

type Service struct {
Server *Server

addr string
}

func NewService(c Config) *Service {
server := NewServer(c.Database)
server.SetBatchSize(c.BatchSize)
server.SetBatchTimeout(time.Duration(c.BatchTimeout))

return &Service{
addr: c.BindAddress,
Server: server,
}
}

func (s *Service) Open() error {
return s.Server.ListenAndServe(s.addr)
}

func (s *Service) Close() error {
return s.Server.Close()
}

func (s *Service) Addr() net.Addr {
return s.Server.Addr()
}
147 changes: 147 additions & 0 deletions services/udp/udp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package udp

import (
"errors"
"log"
"net"
"os"
"sync"
"time"

"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/tsdb"
)

const (
UDPBufferSize = 65536
)

type Server struct {
conn *net.UDPConn
addr *net.UDPAddr
wg sync.WaitGroup
done chan struct{}

batcher *tsdb.PointBatcher

batchSize int
batchTimeout time.Duration
database string

PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
}

Logger *log.Logger
}

func NewServer(db string) *Server {
return &Server{
done: make(chan struct{}),
database: db,
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags),
}
}

func (s *Server) SetBatchSize(sz int) { s.batchSize = sz }
func (s *Server) SetBatchTimeout(d time.Duration) { s.batchTimeout = d }

func (s *Server) ListenAndServe(iface string) (err error) {
if iface == "" {
return errors.New("bind address has to be specified in config")
}
if s.database == "" {
return errors.New("database has to be specified in config")
}

s.addr, err = net.ResolveUDPAddr("udp", iface)
if err != nil {
s.Logger.Printf("Failed to resolve UDP address %s: %s", iface, err)
return err
}

s.conn, err = net.ListenUDP("udp", s.addr)
if err != nil {
s.Logger.Printf("Failed to set up UDP listener at address %s: %s", s.addr, err)
return err
}

s.Logger.Printf("Started listening on %s", iface)

s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchTimeout)

s.wg.Add(2)
go s.serve()
go s.writePoints()

return nil
}

func (s *Server) writePoints() {
defer s.wg.Done()

for {
select {
case batch := <-s.batcher.Out():
err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: s.database,
RetentionPolicy: "",
ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: batch,
})
if err != nil {
s.Logger.Printf("Failed to write point batch to database %q: %s\n", s.database, err)
} else {
s.Logger.Printf("Wrote a batch of %d points to %s", len(batch), s.database)
}
case <-s.done:
return
}
}
}

func (s *Server) serve() {
defer s.wg.Done()

s.batcher.Start()
for {
buf := make([]byte, UDPBufferSize)
n, _, err := s.conn.ReadFromUDP(buf)
if err != nil {
s.Logger.Printf("Failed to read UDP message: %s", err)
s.batcher.Flush()
return
}

points, err := tsdb.ParsePoints(buf[:n])
if err != nil {
s.Logger.Printf("Failed to parse points: %s", err)
continue
}

s.Logger.Printf("Received write for %d points on database %s", len(points), s.database)

for _, point := range points {
s.batcher.In() <- point
}
}
}

func (s *Server) Close() error {
var err error
if s.conn != nil {
err = s.conn.Close()
}

if s.done != nil {
close(s.done)
}

s.wg.Wait()

return err
}

func (s *Server) Addr() net.Addr {
return s.addr
}

0 comments on commit f0dd608

Please sign in to comment.