Skip to content

Commit

Permalink
embed: start peer listeners before EtcdServer start
Browse files Browse the repository at this point in the history
  • Loading branch information
gyuho committed Sep 19, 2017
1 parent dbba6c9 commit dd7db5b
Showing 1 changed file with 40 additions and 35 deletions.
75 changes: 40 additions & 35 deletions embed/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,35 +163,11 @@ func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))

e.Server.Start()
// configure/start peer handlers after rafthttp.Transport started,
// and before initial hash checking in EtcdServer.Start
e.servePeerListeners(cfg)

// configure peer handlers after rafthttp.Transport started
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
if !cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
return
}
}
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
}
e.Server.Start()

if err = e.serve(); err != nil {
return
Expand Down Expand Up @@ -392,21 +368,50 @@ func startClientListeners(cfg *Config) (sctxs map[string]*serveCtx, err error) {
return sctxs, nil
}

func (e *Etcd) serve() (err error) {
if !e.cfg.ClientTLSInfo.Empty() {
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
func (e *Etcd) servePeerListeners(cfg *Config) {
ph := etcdhttp.NewPeerHandler(e.Server)
var peerTLScfg *tls.Config
var err error
if !cfg.PeerTLSInfo.Empty() {
if peerTLScfg, err = cfg.PeerTLSInfo.ServerConfig(); err != nil {
return
}
}

if e.cfg.CorsInfo.String() != "" {
plog.Infof("cors = %s", e.cfg.CorsInfo)
for _, p := range e.Peers {
gs := v3rpc.Server(e.Server, peerTLScfg)
m := cmux.New(p.Listener)
go gs.Serve(m.Match(cmux.HTTP2()))
srv := &http.Server{
Handler: grpcHandlerFunc(gs, ph),
ReadTimeout: 5 * time.Minute,
ErrorLog: defaultLog.New(ioutil.Discard, "", 0), // do not log user error
}
go srv.Serve(m.Match(cmux.Any()))
p.serve = func() error { return m.Serve() }
p.close = func(ctx context.Context) error {
// gracefully shutdown http.Server
// close open listeners, idle connections
// until context cancel or time-out
e.stopGRPCServer(gs)
return srv.Shutdown(ctx)
}
}

// Start the peer server in a goroutine
for _, pl := range e.Peers {
go func(l *peerListener) {
e.errHandler(l.serve())
}(pl)
}
}

func (e *Etcd) serve() (err error) {
if !e.cfg.ClientTLSInfo.Empty() {
plog.Infof("ClientTLS: %s", e.cfg.ClientTLSInfo)
}

if e.cfg.CorsInfo.String() != "" {
plog.Infof("cors = %s", e.cfg.CorsInfo)
}

// Start a client server goroutine for each listen address
var h http.Handler
Expand Down

0 comments on commit dd7db5b

Please sign in to comment.