diff --git a/src/configuration/configuration.go b/src/configuration/configuration.go index 13ddf2dc4f9..86a6de7269d 100644 --- a/src/configuration/configuration.go +++ b/src/configuration/configuration.go @@ -183,8 +183,8 @@ type WalConfig struct { } type InputPlugins struct { - Graphite GraphiteConfig `toml:"graphite"` - UdpInput UdpInputConfig `toml:"udp"` + Graphite GraphiteConfig `toml:"graphite"` + UdpInput UdpInputConfig `toml:"udp"` UdpServersInput []UdpInputConfig `toml:"udp_servers"` } @@ -331,10 +331,7 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { GraphiteDatabase: tomlConfiguration.InputPlugins.Graphite.Database, GraphiteUdpEnabled: tomlConfiguration.InputPlugins.Graphite.UdpEnabled, - UdpInputEnabled: tomlConfiguration.InputPlugins.UdpInput.Enabled, - UdpInputPort: tomlConfiguration.InputPlugins.UdpInput.Port, - UdpInputDatabase: tomlConfiguration.InputPlugins.UdpInput.Database, - UdpServers: tomlConfiguration.InputPlugins.UdpServersInput, + UdpServers: tomlConfiguration.InputPlugins.UdpServersInput, RaftServerPort: tomlConfiguration.Raft.Port, RaftTimeout: tomlConfiguration.Raft.Timeout, @@ -369,6 +366,12 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { ConcurrentShardQueryLimit: defaultConcurrentShardQueryLimit, } + config.UdpServers = append(config.UdpServers, UdpInputConfig{ + Enabled: tomlConfiguration.InputPlugins.UdpInput.Enabled, + Database: tomlConfiguration.InputPlugins.UdpInput.Database, + Port: tomlConfiguration.InputPlugins.UdpInput.Port, + }) + if config.LocalStoreWriteBufferSize == 0 { config.LocalStoreWriteBufferSize = 1000 } diff --git a/src/server/server.go b/src/server/server.go index 50271b98f49..5659dbd044a 100644 --- a/src/server/server.go +++ b/src/server/server.go @@ -137,38 +137,23 @@ func (self *Server) ListenAndServe() error { } } - // singular UDP input - if self.Config.UdpInputEnabled { - if self.Config.UdpInputPort <= 0 || self.Config.UdpInputDatabase == "" { + // UDP input + for _, udpInput := range self.Config.UdpServers { + port := udpInput.Port + database := udpInput.Database + + if port <= 0 || database == "" { log.Warn("Cannot start udp server. please check your configuration") - } else { - log.Info("Starting UDP Listener on port %d to database %s", self.Config.UdpInputPort, self.Config.UdpInputDatabase) - - self.UdpApi = udp.NewServer(self.Config.UdpInputPortString(self.Config.UdpInputPort), self.Config.UdpInputDatabase, self.Coordinator, self.ClusterConfig) - go self.UdpApi.ListenAndServe() + continue } - } - - // multiple UDP input - udpServersCount := len(self.Config.UdpServers) - if udpServersCount > 0 { - for i := 0; i < udpServersCount; i++ { - port := self.Config.UdpServers[i].Port - database := self.Config.UdpServers[i].Database + log.Info("Starting UDP Listener on port %d to database %s", port, database) - if port <= 0 || database == "" { - log.Warn("Cannot start udp server. please check your configuration") - } else { - log.Info("Starting UDP Listener on port %d to database %s", port, database) + addr := self.Config.UdpInputPortString(port) - listenAddress := self.Config.UdpInputPortString(port) - - self.UdpServers[i] = udp.NewServer(listenAddress, database, self.Coordinator, self.ClusterConfig) - go self.UdpServers[i].ListenAndServe() - } - - } + server := udp.NewServer(addr, database, self.Coordinator, self.ClusterConfig) + self.UdpServers = append(self.UdpServers, server) + go server.ListenAndServe() } // start processing continuous queries