-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
c53acaf
commit 3203183
Showing
9 changed files
with
262 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
package udp | ||
|
||
import "github.com/influxdb/influxdb/toml" | ||
|
||
type Config struct { | ||
Enabled bool `toml:"enabled"` | ||
BindAddress string `toml:"bind-address"` | ||
|
||
Database string `toml:"database"` | ||
BatchSize int `toml:"batch-size"` | ||
BatchTimeout toml.Duration `toml:"batch-timeout"` | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
package udp_test | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/BurntSushi/toml" | ||
"github.com/influxdb/influxdb/services/udp" | ||
) | ||
|
||
func TestConfig_Parse(t *testing.T) { | ||
// Parse configuration. | ||
var c udp.Config | ||
if _, err := toml.Decode(` | ||
enabled = true | ||
bind-address = ":4444" | ||
database = "awesomedb" | ||
batch-size = 100 | ||
batch-timeout = "10ms" | ||
`, &c); err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
// Validate configuration. | ||
if c.Enabled != true { | ||
t.Fatalf("unexpected enabled: %v", c.Enabled) | ||
} else if c.BindAddress != ":4444" { | ||
t.Fatalf("unexpected bind address: %s", c.BindAddress) | ||
} else if c.Database != "awesomedb" { | ||
t.Fatalf("unexpected database: %s", c.Database) | ||
} else if c.BatchSize != 100 { | ||
t.Fatalf("unexpected batch size: %d", c.BatchSize) | ||
} else if time.Duration(c.BatchTimeout) != (10 * time.Millisecond) { | ||
t.Fatalf("unexpected batch timeout: %v", c.BatchTimeout) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package udp | ||
|
||
import ( | ||
"net" | ||
"time" | ||
) | ||
|
||
type Service struct { | ||
Server *Server | ||
|
||
addr string | ||
} | ||
|
||
func NewService(c Config) *Service { | ||
server := NewServer(c.Database) | ||
server.SetBatchSize(c.BatchSize) | ||
server.SetBatchTimeout(time.Duration(c.BatchTimeout)) | ||
|
||
return &Service{ | ||
addr: c.BindAddress, | ||
Server: server, | ||
} | ||
} | ||
|
||
func (s *Service) Open() error { | ||
return s.Server.ListenAndServe(s.addr) | ||
} | ||
|
||
func (s *Service) Close() error { | ||
return s.Server.Close() | ||
} | ||
|
||
func (s *Service) Addr() net.Addr { | ||
return s.Server.Addr() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
package udp | ||
|
||
import ( | ||
"errors" | ||
"log" | ||
"net" | ||
"os" | ||
"sync" | ||
"time" | ||
|
||
"github.com/influxdb/influxdb/cluster" | ||
"github.com/influxdb/influxdb/tsdb" | ||
) | ||
|
||
const ( | ||
UDPBufferSize = 65536 | ||
) | ||
|
||
type Server struct { | ||
conn *net.UDPConn | ||
addr *net.UDPAddr | ||
wg sync.WaitGroup | ||
done chan struct{} | ||
|
||
batcher *tsdb.PointBatcher | ||
|
||
batchSize int | ||
batchTimeout time.Duration | ||
database string | ||
|
||
PointsWriter interface { | ||
WritePoints(p *cluster.WritePointsRequest) error | ||
} | ||
|
||
Logger *log.Logger | ||
} | ||
|
||
func NewServer(db string) *Server { | ||
return &Server{ | ||
done: make(chan struct{}), | ||
database: db, | ||
Logger: log.New(os.Stderr, "[udp] ", log.LstdFlags), | ||
} | ||
} | ||
|
||
func (s *Server) SetBatchSize(sz int) { s.batchSize = sz } | ||
func (s *Server) SetBatchTimeout(d time.Duration) { s.batchTimeout = d } | ||
|
||
func (s *Server) ListenAndServe(iface string) (err error) { | ||
if iface == "" { | ||
return errors.New("bind address has to be specified in config") | ||
} | ||
if s.database == "" { | ||
return errors.New("database has to be specified in config") | ||
} | ||
|
||
s.addr, err = net.ResolveUDPAddr("udp", iface) | ||
if err != nil { | ||
s.Logger.Printf("Failed to resolve UDP address %s: %s", iface, err) | ||
return err | ||
} | ||
|
||
s.conn, err = net.ListenUDP("udp", s.addr) | ||
if err != nil { | ||
s.Logger.Printf("Failed to set up UDP listener at address %s: %s", s.addr, err) | ||
return err | ||
} | ||
|
||
s.Logger.Printf("Started listening on %s", iface) | ||
|
||
s.batcher = tsdb.NewPointBatcher(s.batchSize, s.batchTimeout) | ||
|
||
s.wg.Add(2) | ||
go s.serve() | ||
go s.writePoints() | ||
|
||
return nil | ||
} | ||
|
||
func (s *Server) writePoints() { | ||
defer s.wg.Done() | ||
|
||
for { | ||
select { | ||
case batch := <-s.batcher.Out(): | ||
err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{ | ||
Database: s.database, | ||
RetentionPolicy: "", | ||
ConsistencyLevel: cluster.ConsistencyLevelOne, | ||
Points: batch, | ||
}) | ||
if err != nil { | ||
s.Logger.Printf("Failed to write point batch to database %q: %s\n", s.database, err) | ||
} else { | ||
s.Logger.Printf("Wrote a batch of %d points to %s", len(batch), s.database) | ||
} | ||
case <-s.done: | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (s *Server) serve() { | ||
defer s.wg.Done() | ||
|
||
s.batcher.Start() | ||
for { | ||
buf := make([]byte, UDPBufferSize) | ||
|
||
select { | ||
case <-s.done: | ||
// We closed the connection, time to go. | ||
return | ||
default: | ||
// Keep processing. | ||
} | ||
|
||
n, _, err := s.conn.ReadFromUDP(buf) | ||
if err != nil { | ||
s.Logger.Printf("Failed to read UDP message: %s", err) | ||
continue | ||
} | ||
|
||
points, err := tsdb.ParsePoints(buf[:n]) | ||
if err != nil { | ||
s.Logger.Printf("Failed to parse points: %s", err) | ||
continue | ||
} | ||
|
||
s.Logger.Printf("Received write for %d points on database %s", len(points), s.database) | ||
|
||
for _, point := range points { | ||
s.batcher.In() <- point | ||
} | ||
} | ||
} | ||
|
||
func (s *Server) Close() error { | ||
if s.conn == nil { | ||
return errors.New("Server already closed") | ||
} | ||
|
||
s.conn.Close() | ||
s.batcher.Flush() | ||
close(s.done) | ||
s.wg.Wait() | ||
|
||
// Release all remaining resources. | ||
s.done = nil | ||
s.conn = nil | ||
|
||
s.Logger.Print("Server closed") | ||
|
||
return nil | ||
} | ||
|
||
func (s *Server) Addr() net.Addr { | ||
return s.addr | ||
} |