diff --git a/cmd/influxd/run/config.go b/cmd/influxd/run/config.go index 1eeb4cb47f0..fb1c4d5929f 100644 --- a/cmd/influxd/run/config.go +++ b/cmd/influxd/run/config.go @@ -19,6 +19,7 @@ import ( "github.com/influxdb/influxdb/services/httpd" "github.com/influxdb/influxdb/services/monitor" "github.com/influxdb/influxdb/services/opentsdb" + "github.com/influxdb/influxdb/services/udp" "github.com/influxdb/influxdb/tsdb" ) @@ -77,6 +78,7 @@ type Config struct { Graphites []graphite.Config `toml:"graphite"` Collectd collectd.Config `toml:"collectd"` OpenTSDB opentsdb.Config `toml:"opentsdb"` + UDP udp.Config `toml:"udp"` // Snapshot SnapshotConfig `toml:"snapshot"` Monitoring monitor.Config `toml:"monitoring"` diff --git a/cmd/influxd/run/config_test.go b/cmd/influxd/run/config_test.go index 1c06e32a0f3..e6d2cfdf0cb 100644 --- a/cmd/influxd/run/config_test.go +++ b/cmd/influxd/run/config_test.go @@ -45,6 +45,9 @@ bind-address = ":1000" [opentsdb] bind-address = ":2000" +[udp] +bind-address = ":4444" + [monitoring] enabled = true @@ -81,6 +84,8 @@ enabled = true t.Fatalf("unexpected collectd bind address: %s", c.Collectd.BindAddress) } else if c.OpenTSDB.BindAddress != ":2000" { t.Fatalf("unexpected opentsdb bind address: %s", c.OpenTSDB.BindAddress) + } else if c.UDP.BindAddress != ":4444" { + t.Fatalf("unexpected udp bind address: %s", c.UDP.BindAddress) } else if c.Monitoring.Enabled != true { t.Fatalf("unexpected monitoring enabled: %v", c.Monitoring.Enabled) } else if c.ContinuousQuery.Enabled != true { diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go index 5bd425f5437..779772b5d49 100644 --- a/cmd/influxd/run/server.go +++ b/cmd/influxd/run/server.go @@ -11,6 +11,7 @@ import ( "github.com/influxdb/influxdb/services/graphite" "github.com/influxdb/influxdb/services/httpd" "github.com/influxdb/influxdb/services/opentsdb" + "github.com/influxdb/influxdb/services/udp" "github.com/influxdb/influxdb/tsdb" ) @@ -55,6 +56,7 @@ func NewServer(c *Config, joinURLs string) *Server { s.appendHTTPDService(c.HTTPD) s.appendCollectdService(c.Collectd) s.appendOpenTSDBService(c.OpenTSDB) + s.appendUDPService(c.UDP) for _, g := range c.Graphites { s.appendGraphiteService(g) } @@ -96,6 +98,12 @@ func (s *Server) appendGraphiteService(c graphite.Config) { s.Services = append(s.Services, srv) } +func (s *Server) appendUDPService(c udp.Config) { + srv := udp.NewService(c) + srv.Server.PointsWriter = s.PointsWriter + s.Services = append(s.Services, srv) +} + // Open opens the meta and data store and all services. func (s *Server) Open() error { if err := func() error { diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index cdcb813bf38..940bb10a1b5 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -1010,6 +1010,9 @@ func NewConfig() *run.Config { c.HTTPD.Enabled = true c.HTTPD.BindAddress = "127.0.0.1:0" c.HTTPD.LogEnabled = testing.Verbose() + + c.UDP.BindAddress = "127.0.0.1:0" + c.UDP.Database = "db0" return c } diff --git a/etc/config.sample.toml b/etc/config.sample.toml index c5adc091576..32282e1f647 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -71,8 +71,10 @@ enabled = false # Configure UDP listener for series data. [udp] enabled = false -# bind-address = "0.0.0.0" -# port = 4444 +# bind-address = ":4444" +# database = "udp_database" +# batch-size = 0 # How many points to batch up internally before writing. +# batch-timeout = "0ms" # Maximum time to wait before sending batch, regardless of current size. # Broker configuration. Brokers are nodes which participate in distributed # consensus. diff --git a/services/udp/config.go b/services/udp/config.go new file mode 100644 index 00000000000..0e5a2b1b0f4 --- /dev/null +++ b/services/udp/config.go @@ -0,0 +1,12 @@ +package udp + +import "github.com/influxdb/influxdb/toml" + +type Config struct { + Enabled bool `toml:"enabled"` + BindAddress string `toml:"bind-address"` + + Database string `toml:"database"` + BatchSize int `toml:"batch-size"` + BatchTimeout toml.Duration `toml:"batch-timeout"` +} diff --git a/services/udp/config_test.go b/services/udp/config_test.go new file mode 100644 index 00000000000..d094c74c9af --- /dev/null +++ b/services/udp/config_test.go @@ -0,0 +1,36 @@ +package udp_test + +import ( + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/influxdb/influxdb/services/udp" +) + +func TestConfig_Parse(t *testing.T) { + // Parse configuration. + var c udp.Config + if _, err := toml.Decode(` +enabled = true +bind-address = ":4444" +database = "awesomedb" +batch-size = 100 +batch-timeout = "10ms" +`, &c); err != nil { + t.Fatal(err) + } + + // Validate configuration. + if c.Enabled != true { + t.Fatalf("unexpected enabled: %v", c.Enabled) + } else if c.BindAddress != ":4444" { + t.Fatalf("unexpected bind address: %s", c.BindAddress) + } else if c.Database != "awesomedb" { + t.Fatalf("unexpected database: %s", c.Database) + } else if c.BatchSize != 100 { + t.Fatalf("unexpected batch size: %d", c.BatchSize) + } else if time.Duration(c.BatchTimeout) != (10 * time.Millisecond) { + t.Fatalf("unexpected batch timeout: %v", c.BatchTimeout) + } +} diff --git a/services/udp/service.go b/services/udp/service.go new file mode 100644 index 00000000000..f328eb513ff --- /dev/null +++ b/services/udp/service.go @@ -0,0 +1,35 @@ +package udp + +import ( + "net" + "time" +) + +type Service struct { + Server *Server + + addr string +} + +func NewService(c Config) *Service { + server := NewServer(c.Database) + server.SetBatchSize(c.BatchSize) + server.SetBatchTimeout(time.Duration(c.BatchTimeout)) + + return &Service{ + addr: c.BindAddress, + Server: server, + } +} + +func (s *Service) Open() error { + return s.Server.ListenAndServe(s.addr) +} + +func (s *Service) Close() error { + return s.Server.Close() +} + +func (s *Service) Addr() net.Addr { + return s.Server.Addr() +} diff --git a/services/udp/udp.go b/services/udp/udp.go new file mode 100644 index 00000000000..61bb4be0e2a --- /dev/null +++ b/services/udp/udp.go @@ -0,0 +1,147 @@ +package udp + +import ( + "errors" + "log" + "net" + "os" + "sync" + "time" + + "github.com/influxdb/influxdb/cluster" + "github.com/influxdb/influxdb/tsdb" +) + +const ( + UDPBufferSize = 65536 +) + +type Server struct { + conn *net.UDPConn + addr *net.UDPAddr + wg sync.WaitGroup + done chan struct{} + + batcher *tsdb.PointBatcher + + batchSize int + batchTimeout time.Duration + database string + + PointsWriter interface { + WritePoints(p *cluster.WritePointsRequest) error + } + + Logger *log.Logger +} + +func NewServer(db string) *Server { + return &Server{ + done: make(chan struct{}), + database: db, + Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags), + } +} + +func (s *Server) SetBatchSize(sz int) { s.batchSize = sz } +func (s *Server) SetBatchTimeout(d time.Duration) { s.batchTimeout = d } + +func (s *Server) ListenAndServe(iface string) (err error) { + if iface == "" { + return errors.New("bind address has to be specified in config") + } + if s.database == "" { + return errors.New("database has to be specified in config") + } + + s.addr, err = net.ResolveUDPAddr("udp", iface) + if err != nil { + s.Logger.Printf("Failed to resolve UDP address %s: %s", iface, err) + return err + } + + s.conn, err = net.ListenUDP("udp", s.addr) + if err != nil { + s.Logger.Printf("Failed to set up UDP listener at address %s: %s", s.addr, err) + return err + } + + s.Logger.Printf("Started listening on %s", iface) + + s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchTimeout) + + s.wg.Add(2) + go s.serve() + go s.writePoints() + + return nil +} + +func (s *Server) writePoints() { + defer s.wg.Done() + + for { + select { + case batch := <-s.batcher.Out(): + err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{ + Database: s.database, + RetentionPolicy: "", + ConsistencyLevel: cluster.ConsistencyLevelOne, + Points: batch, + }) + if err != nil { + s.Logger.Printf("Failed to write point batch to database %q: %s\n", s.database, err) + } else { + s.Logger.Printf("Wrote a batch of %d points to %s", len(batch), s.database) + } + case <-s.done: + return + } + } +} + +func (s *Server) serve() { + defer s.wg.Done() + + s.batcher.Start() + for { + buf := make([]byte, UDPBufferSize) + n, _, err := s.conn.ReadFromUDP(buf) + if err != nil { + s.Logger.Printf("Failed to read UDP message: %s", err) + s.batcher.Flush() + return + } + + points, err := tsdb.ParsePoints(buf[:n]) + if err != nil { + s.Logger.Printf("Failed to parse points: %s", err) + continue + } + + s.Logger.Printf("Received write for %d points on database %s", len(points), s.database) + + for _, point := range points { + s.batcher.In() <- point + } + } +} + +func (s *Server) Close() error { + var err error + if s.conn != nil { + err = s.conn.Close() + } + + if s.done != nil { + close(s.done) + } + + s.wg.Wait() + + return err +} + +func (s *Server) Addr() net.Addr { + return s.addr +}