Skip to content

Commit

Permalink
Merge pull request #2794 from benbjohnson/mux
Browse files Browse the repository at this point in the history
Influxd mux integration
  • Loading branch information
benbjohnson committed Jun 6, 2015
2 parents 041b31d + fb06549 commit 9ec6e4f
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 157 deletions.
5 changes: 0 additions & 5 deletions cluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,18 @@ import (
)

const (
// DefaultBindAddress is the default bind address for the HTTP server.
DefaultBindAddress = ":8087"

// DefaultShardWriterTimeout is the default timeout set on shard writers.
DefaultShardWriterTimeout = 5 * time.Second
)

// Config represents the configuration for the the clustering service.
type Config struct {
BindAddress string `toml:"bind-address"`
ShardWriterTimeout toml.Duration `toml:"shard-writer-timeout"`
}

// NewConfig returns an instance of Config with defaults.
func NewConfig() Config {
return Config{
BindAddress: DefaultBindAddress,
ShardWriterTimeout: toml.Duration(DefaultShardWriterTimeout),
}
}
5 changes: 1 addition & 4 deletions cluster/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@ func TestConfig_Parse(t *testing.T) {
// Parse configuration.
var c cluster.Config
if _, err := toml.Decode(`
bind-address = ":8080"
shard-writer-timeout = "10s"
`, &c); err != nil {
t.Fatal(err)
}

// Validate configuration.
if c.BindAddress != ":8080" {
t.Fatalf("unexpected bind address: %s", c.BindAddress)
} else if time.Duration(c.ShardWriterTimeout) != 10*time.Second {
if time.Duration(c.ShardWriterTimeout) != 10*time.Second {
t.Fatalf("unexpected bind address: %s", c.ShardWriterTimeout)
}
}
35 changes: 10 additions & 25 deletions cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ import (
// MaxMessageSize defines how large a message can be before we reject it
const MaxMessageSize = 1024 * 1024 * 1024 // 1GB

// MuxHeader is the header byte used in the TCP mux.
const MuxHeader = 2

// Service processes data received over raw TCP connections.
type Service struct {
mu sync.RWMutex
addr string
ln net.Listener
mu sync.RWMutex

wg sync.WaitGroup
closing chan struct{}

Listener net.Listener

TSDBStore interface {
WriteToShard(shardID uint64, points []tsdb.Point) error
}
Expand All @@ -35,23 +38,13 @@ type Service struct {
// NewService returns a new instance of Service.
func NewService(c Config) *Service {
return &Service{
addr: c.BindAddress,
closing: make(chan struct{}),
Logger: log.New(os.Stderr, "[tcp] ", log.LstdFlags),
}
}

// Open opens the network listener and begins serving requests.
func (s *Service) Open() error {
// Open TCP listener.
ln, err := net.Listen("tcp", s.addr)
if err != nil {
return err
}
s.ln = ln

s.Logger.Println("listening on TCP:", ln.Addr().String())

// Begin serving conections.
s.wg.Add(1)
go s.serve()
Expand All @@ -77,7 +70,7 @@ func (s *Service) serve() {
}

// Accept the next connection.
conn, err := s.ln.Accept()
conn, err := s.Listener.Accept()
if opErr, ok := err.(*net.OpError); ok && opErr.Temporary() {
s.Logger.Println("error temporarily accepting TCP connection", err.Error())
continue
Expand All @@ -96,25 +89,17 @@ func (s *Service) serve() {

// Close shuts down the listener and waits for all connections to finish.
func (s *Service) Close() error {
if s.ln != nil {
s.ln.Close()
if s.Listener != nil {
s.Listener.Close()
}

// Shut down all handlers.
close(s.closing)
s.wg.Wait()
// s.wg.Wait() // FIXME(benbjohnson)

return nil
}

// Addr returns the network address of the service.
func (s *Service) Addr() net.Addr {
if s.ln != nil {
return s.ln.Addr()
}
return nil
}

// handleConn services an individual TCP connection.
func (s *Service) handleConn(conn net.Conn) {
// Ensure connection is closed when service is closed.
Expand Down
22 changes: 22 additions & 0 deletions cluster/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package cluster_test

import (
"fmt"
"net"
"time"

"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/meta"
"github.com/influxdb/influxdb/tcp"
"github.com/influxdb/influxdb/tsdb"
)

Expand All @@ -22,11 +25,30 @@ func (m *metaStore) Node(nodeID uint64) (*meta.NodeInfo, error) {
type testService struct {
nodeID uint64
writeShardFunc func(shardID uint64, points []tsdb.Point) error
ln net.Listener
muxln net.Listener
}

func newTestService(f func(shardID uint64, points []tsdb.Point) error) testService {
ln, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
panic(err)
}

mux := tcp.NewMux()
muxln := mux.Listen(cluster.MuxHeader)
go mux.Serve(ln)

return testService{
writeShardFunc: f,
ln: ln,
muxln: muxln,
}
}

func (ts *testService) Close() {
if ts.ln != nil {
ts.ln.Close()
}
}

Expand Down
7 changes: 7 additions & 0 deletions cluster/shard_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,5 +144,12 @@ func (c *connFactory) dial() (net.Conn, error) {
return nil, err
}

// Write a marker byte for cluster messages.
_, err = conn.Write([]byte{MuxHeader})
if err != nil {
conn.Close()
return nil, err
}

return conn, nil
}
24 changes: 16 additions & 8 deletions cluster/shard_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ import (
// Ensure the shard writer can successful write a single request.
func TestShardWriter_WriteShard_Success(t *testing.T) {
ts := newTestService(writeShardSuccess)
s := cluster.NewService(cluster.Config{BindAddress: "127.0.0.1:0"})
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.TSDBStore = ts
if err := s.Open(); err != nil {
t.Fatal(err)
}
defer s.Close()
defer ts.Close()

w := cluster.NewShardWriter(time.Minute)
w.MetaStore = &metaStore{host: s.Addr().String()}
w.MetaStore = &metaStore{host: ts.ln.Addr().String()}

// Build a single point.
now := time.Now()
Expand Down Expand Up @@ -58,15 +60,17 @@ func TestShardWriter_WriteShard_Success(t *testing.T) {
// Ensure the shard writer can successful write a multiple requests.
func TestShardWriter_WriteShard_Multiple(t *testing.T) {
ts := newTestService(writeShardSuccess)
s := cluster.NewService(cluster.Config{BindAddress: "127.0.0.1:0"})
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.TSDBStore = ts
if err := s.Open(); err != nil {
t.Fatal(err)
}
defer s.Close()
defer ts.Close()

w := cluster.NewShardWriter(time.Minute)
w.MetaStore = &metaStore{host: s.Addr().String()}
w.MetaStore = &metaStore{host: ts.ln.Addr().String()}

// Build a single point.
now := time.Now()
Expand Down Expand Up @@ -105,15 +109,17 @@ func TestShardWriter_WriteShard_Multiple(t *testing.T) {
// Ensure the shard writer returns an error when the server fails to accept the write.
func TestShardWriter_WriteShard_Error(t *testing.T) {
ts := newTestService(writeShardFail)
s := cluster.NewService(cluster.Config{BindAddress: "127.0.0.1:0"})
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.TSDBStore = ts
if err := s.Open(); err != nil {
t.Fatal(err)
}
defer s.Close()
defer ts.Close()

w := cluster.NewShardWriter(time.Minute)
w.MetaStore = &metaStore{host: s.Addr().String()}
w.MetaStore = &metaStore{host: ts.ln.Addr().String()}
now := time.Now()

shardID := uint64(1)
Expand All @@ -131,15 +137,17 @@ func TestShardWriter_WriteShard_Error(t *testing.T) {
// Ensure the shard writer returns an error when dialing times out.
func TestShardWriter_Write_ErrDialTimeout(t *testing.T) {
ts := newTestService(writeShardSuccess)
s := cluster.NewService(cluster.Config{BindAddress: "127.0.0.1:0"})
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.TSDBStore = ts
if err := s.Open(); err != nil {
t.Fatal(err)
}
defer s.Close()
defer ts.Close()

w := cluster.NewShardWriter(time.Nanosecond)
w.MetaStore = &metaStore{host: s.Addr().String()}
w.MetaStore = &metaStore{host: ts.ln.Addr().String()}
now := time.Now()

shardID := uint64(1)
Expand Down
42 changes: 40 additions & 2 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package run

import (
"fmt"
"net"
"time"

"github.com/influxdb/influxdb/cluster"
Expand All @@ -13,6 +14,7 @@ import (
"github.com/influxdb/influxdb/services/opentsdb"
"github.com/influxdb/influxdb/services/retention"
"github.com/influxdb/influxdb/services/udp"
"github.com/influxdb/influxdb/tcp"
"github.com/influxdb/influxdb/tsdb"
)

Expand All @@ -23,6 +25,10 @@ type Server struct {
err chan error
closing chan struct{}

Hostname string
BindAddress string
Listener net.Listener

MetaStore *meta.Store
TSDBStore *tsdb.Store
QueryExecutor *tsdb.QueryExecutor
Expand All @@ -36,8 +42,12 @@ type Server struct {
func NewServer(c *Config) *Server {
// Construct base meta store and data store.
s := &Server{
err: make(chan error),
closing: make(chan struct{}),
err: make(chan error),
closing: make(chan struct{}),

Hostname: c.Meta.Hostname,
BindAddress: c.Meta.BindAddress,

MetaStore: meta.NewStore(c.Meta),
TSDBStore: tsdb.NewStore(c.Data.Dir),
}
Expand Down Expand Up @@ -146,6 +156,31 @@ func (s *Server) Err() <-chan error { return s.err }
// Open opens the meta and data store and all services.
func (s *Server) Open() error {
if err := func() error {
// Resolve host to address.
_, port, err := net.SplitHostPort(s.BindAddress)
if err != nil {
return fmt.Errorf("split bind address: %s", err)
}
hostport := net.JoinHostPort(s.Hostname, port)
addr, err := net.ResolveTCPAddr("tcp", hostport)
if err != nil {
return fmt.Errorf("resolve tcp: addr=%s, err=%s", hostport, err)
}
s.MetaStore.Addr = addr

// Open shared TCP connection.
ln, err := net.Listen("tcp", s.BindAddress)
if err != nil {
return fmt.Errorf("listen: %s", err)
}
s.Listener = ln

// Multiplex listener.
mux := tcp.NewMux()
s.MetaStore.RaftListener = mux.Listen(meta.MuxRaftHeader)
s.MetaStore.ExecListener = mux.Listen(meta.MuxExecHeader)
s.Services[0].(*cluster.Service).Listener = mux.Listen(cluster.MuxHeader)

// Open meta store.
if err := s.MetaStore.Open(); err != nil {
return fmt.Errorf("open meta store: %s", err)
Expand Down Expand Up @@ -178,6 +213,9 @@ func (s *Server) Open() error {

// Close shuts down the meta and data stores and all services.
func (s *Server) Close() error {
if s.Listener != nil {
s.Listener.Close()
}
if s.MetaStore != nil {
s.MetaStore.Close()
}
Expand Down
1 change: 0 additions & 1 deletion cmd/influxd/run/server_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (s *Server) Write(db, rp, body string, params url.Values) (results string,
// NewConfig returns the default config with temporary paths.
func NewConfig() *run.Config {
c := run.NewConfig()
c.Cluster.BindAddress = "127.0.0.1:0"
c.Meta.Dir = MustTempFile()
c.Meta.BindAddress = "127.0.0.1:0"
c.Meta.HeartbeatTimeout = toml.Duration(50 * time.Millisecond)
Expand Down
Loading

0 comments on commit 9ec6e4f

Please sign in to comment.