Skip to content

Commit

Permalink
Create databases for Graphite at higher level
Browse files Browse the repository at this point in the history
By doing this the SeriesWriter interface stays focused and just has
methods for writing data. Its name then remains coherent.
  • Loading branch information
otoolep committed Mar 12, 2015
1 parent f8c39d4 commit d82d040
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 75 deletions.
10 changes: 10 additions & 0 deletions cmd/influxd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ const (
// DefaultRetentionCreatePeriod represents how often the server will check to see if new
// shard groups need to be created in advance for writing
DefaultRetentionCreatePeriod = 45 * time.Minute

// DefaultGraphiteDatabaseName is the default Graphite database if none is specified
DefaultGraphiteDatabaseName = "graphite"
)

// Config represents the configuration format for the influxd binary.
Expand Down Expand Up @@ -379,6 +382,13 @@ func (g *Graphite) NameSeparatorString() string {
return g.NameSeparator
}

func (g *Graphite) DatabaseString() string {
if g.Database == "" {
return DefaultGraphiteDatabaseName
}
return g.Database
}

// LastEnabled returns whether the Graphite Server shoudl intepret the last field as "name".
func (g *Graphite) LastEnabled() bool {
return g.NamePosition == strings.ToLower("last")
Expand Down
5 changes: 2 additions & 3 deletions cmd/influxd/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ func TestParseConfig(t *testing.T) {
t.Fatalf("graphite udp address mismatch: expected %v, got %v", "192.168.0.2", udpGraphite.Addr)
case udpGraphite.Port != 2005:
t.Fatalf("graphite udp port mismatch: expected %v, got %v", 2005, udpGraphite.Port)
case udpGraphite.Database != "graphite_udp":
t.Fatalf("graphite database mismatch: expected %v, got %v", "graphite_udp", udpGraphite.Database)
case udpGraphite.DatabaseString() != "graphite":
t.Fatalf("graphite database mismatch: expected %v, got %v", "graphite", udpGraphite.Database)
case strings.ToLower(udpGraphite.Protocol) != "udp":
t.Fatalf("graphite udp protocol mismatch: expected %v, got %v", "udp", strings.ToLower(udpGraphite.Protocol))
}
Expand Down Expand Up @@ -214,7 +214,6 @@ protocol = "udP"
enabled = true
address = "192.168.0.2"
port = 2005
database = "graphite_udp" # store graphite data in this database
# Configure collectd server
[collectd]
Expand Down
9 changes: 6 additions & 3 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,18 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
parser.Separator = c.NameSeparatorString()
parser.LastEnabled = c.LastEnabled()

if err := s.CreateDatabaseIfNotExists(c.DatabaseString()); err != nil {
log.Fatalf("failed to create database for %s Graphite server: %s", c.Protocol, err.Error())
}

// Spin up the server.
var g graphite.Server
g, err := graphite.NewServer(c.Protocol, parser, s)
g, err := graphite.NewServer(c.Protocol, parser, s, c.DatabaseString())
if err != nil {
log.Fatalf("failed to initialize %s Graphite server: %s", c.Protocol, err.Error())
}

g.SetDatabase(c.Database)
g.SetLogOutput(logWriter)

err = g.ListenAndServe(c.ConnectionString(config.BindAddress))
if err != nil {
log.Fatalf("failed to start %s Graphite server: %s", c.Protocol, err.Error())
Expand Down
16 changes: 4 additions & 12 deletions graphite/graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ const (

// DefaultGraphiteNameSeparator represents the default Graphite field separator.
DefaultGraphiteNameSeparator = "."

// DefaultDatabaseName is the default database that is created if none is specified
DefaultDatabaseName = "graphite"
)

var (
Expand All @@ -37,26 +34,21 @@ var (
// SeriesWriter defines the interface for the destination of the data.
type SeriesWriter interface {
WriteSeries(string, string, []influxdb.Point) (uint64, error)
CreateDatabase(string) error
CreateRetentionPolicy(string, *influxdb.RetentionPolicy) error
DatabaseExists(string) bool
}

// Server defines the interface all Graphite servers support.
type Server interface {
SetLogOutput(w io.Writer)
SetDatabase(string)
ListenAndServe(iface string) error
Protocol() string
}

// NewServer return a Graphite server for the given protocol, using the given parser
// and series writer.
func NewServer(protocol string, p *Parser, s SeriesWriter) (Server, error) {
// series writer, and database.
func NewServer(protocol string, p *Parser, s SeriesWriter, db string) (Server, error) {
if strings.ToLower(protocol) == "tcp" {
return NewTCPServer(p, s), nil
return NewTCPServer(p, s, db), nil
} else if strings.ToLower(protocol) == "udp" {
return NewUDPServer(p, s), nil
return NewUDPServer(p, s, db), nil
} else {
return nil, fmt.Errorf("unrecognized Graphite Server protocol %s", protocol)
}
Expand Down
40 changes: 11 additions & 29 deletions graphite/graphite_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@ import (

// TCPServer processes Graphite data received over TCP connections.
type TCPServer struct {
server SeriesWriter
parser *Parser
writer SeriesWriter
parser *Parser
database string

Database string
Logger *log.Logger
Logger *log.Logger
}

// NewTCPServer returns a new instance of a TCPServer.
func NewTCPServer(p *Parser, s SeriesWriter) *TCPServer {
func NewTCPServer(p *Parser, w SeriesWriter, db string) *TCPServer {
return &TCPServer{
parser: p,
server: s,
parser: p,
writer: w,
database: db,
}
}

Expand All @@ -32,30 +33,11 @@ func (s *TCPServer) SetLogOutput(w io.Writer) {
s.Logger = log.New(w, "[graphite] ", log.LstdFlags)
}

// SetDatabase sets database for all Graphite log output.
func (s *TCPServer) SetDatabase(database string) {
s.Database = database
}

// Protocol returns a string version of the supported protocol.
func (s *TCPServer) Protocol() string {
return "tcp"
}

// ListenAndServe instructs the TCPServer to start processing Graphite data
// on the given interface. iface must be in the form host:port
func (t *TCPServer) ListenAndServe(iface string) error {
if iface == "" { // Make sure we have an address
return ErrBindAddressRequired
} else if t.Database == "" {
// If they didn't specify a database, create one and set a default retention policy.
if !t.server.DatabaseExists(DefaultDatabaseName) {
t.Logger.Printf("default database %q does not exist. creating.\n", DefaultDatabaseName)
if e := t.server.CreateDatabase(DefaultDatabaseName); e != nil {
return e
}
t.Database = DefaultDatabaseName
}
}

ln, err := net.Listen("tcp", iface)
Expand Down Expand Up @@ -97,10 +79,10 @@ func (t *TCPServer) handleConnection(conn net.Conn) {
continue
}

// Send the data to database
_, e := t.server.WriteSeries(t.Database, "", []influxdb.Point{point})
// Send the data to the writer.
_, e := t.writer.WriteSeries(t.database, "", []influxdb.Point{point})
if e != nil {
t.Logger.Printf("failed to write data point to database %q: %s\n", t.Database, e)
t.Logger.Printf("failed to write data point to database %q: %s\n", t.database, e)
}
}
}
38 changes: 10 additions & 28 deletions graphite/graphite_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,19 @@ const (

// UDPServer processes Graphite data received via UDP.
type UDPServer struct {
server SeriesWriter
parser *Parser
writer SeriesWriter
parser *Parser
database string

Database string
Logger *log.Logger
Logger *log.Logger
}

// NewUDPServer returns a new instance of a UDPServer
func NewUDPServer(p *Parser, s SeriesWriter) *UDPServer {
func NewUDPServer(p *Parser, w SeriesWriter, db string) *UDPServer {
u := UDPServer{
parser: p,
server: s,
parser: p,
writer: w,
database: db,
}
return &u
}
Expand All @@ -36,30 +37,11 @@ func (s *UDPServer) SetLogOutput(w io.Writer) {
s.Logger = log.New(w, "[graphite] ", log.LstdFlags)
}

// SetDatabase sets the database for all Graphite log output.
func (s *UDPServer) SetDatabase(database string) {
s.Database = database
}

// Protocol returns a string version of the supported protocol.
func (s *UDPServer) Protocol() string {
return "udp"
}

// ListenAndServer instructs the UDPServer to start processing Graphite data
// on the given interface. iface must be in the form host:port.
func (u *UDPServer) ListenAndServe(iface string) error {
if iface == "" { // Make sure we have an address
return ErrBindAddressRequired
} else if u.Database == "" { // Make sure they have a database
// If they didn't specify a database, create one and set a default retention policy.
if !u.server.DatabaseExists(DefaultDatabaseName) {
u.Logger.Printf("default database %q does not exist. creating.\n", DefaultDatabaseName)
if e := u.server.CreateDatabase(DefaultDatabaseName); e != nil {
return e
}
u.Database = DefaultDatabaseName
}
}

addr, err := net.ResolveUDPAddr("udp", iface)
Expand All @@ -85,8 +67,8 @@ func (u *UDPServer) ListenAndServe(iface string) error {
continue
}

// Send the data to database
_, e := u.server.WriteSeries(u.Database, "", []influxdb.Point{point})
// Send the data to the writer.
_, e := u.writer.WriteSeries(u.database, "", []influxdb.Point{point})
if e != nil {
u.Logger.Printf("failed to write data point: %s\n", e)
}
Expand Down
8 changes: 8 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,14 @@ func (s *Server) CreateDatabase(name string) error {
return err
}

// CreateDatabaseIfNotExists creates a new database if, and only if, it does not exist already.
func (s *Server) CreateDatabaseIfNotExists(name string) error {
if s.DatabaseExists(name) {
return nil
}
return s.CreateDatabase(name)
}

func (s *Server) applyCreateDatabase(m *messaging.Message) (err error) {
var c createDatabaseCommand
mustUnmarshalJSON(m.Data, &c)
Expand Down

0 comments on commit d82d040

Please sign in to comment.