Skip to content

Commit

Permalink
Fix: Graceful shutdown bugs (#1254)
Browse files Browse the repository at this point in the history
* fix graceful shutdown bugs

* simplify log

* fix graceful shutdown bug for invokers

* fix timeout bug

* fix RequestsFinished not working bug

* fix bugs related ShutdownConfig
  • Loading branch information
justxuewei committed Jun 13, 2021
1 parent 863d1f3 commit 7fedc22
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 9 deletions.
6 changes: 4 additions & 2 deletions config/graceful_shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func waitForReceivingRequests() {
// ignore this step
return
}
providerConfig.ShutdownConfig.RejectRequest = true
waitingProcessedTimeout(providerConfig.ShutdownConfig)
}

Expand All @@ -173,6 +174,7 @@ func waitForSendingRequests() {
// ignore this step
return
}
consumerConfig.ShutdownConfig.RejectRequest = true
waitingProcessedTimeout(consumerConfig.ShutdownConfig)
}

Expand All @@ -181,9 +183,9 @@ func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
if timeout <= 0 {
return
}
start := time.Now()
deadline := time.Now().Add(timeout)

for time.Now().After(start.Add(timeout)) && !shutdownConfig.RequestsFinished {
for time.Now().Before(deadline) && !shutdownConfig.RequestsFinished {
// sleep 10 ms and then we check it again
time.Sleep(10 * time.Millisecond)
}
Expand Down
1 change: 0 additions & 1 deletion config/graceful_shutdown_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ type ShutdownConfig struct {
RejectRequestHandler string `yaml:"reject_handler" json:"reject_handler,omitempty" property:"reject_handler"`
// true -> new request will be rejected.
RejectRequest bool

// true -> all requests had been processed. In provider side it means that all requests are returned response to clients
// In consumer side, it means that all requests getting response from servers
RequestsFinished bool
Expand Down
3 changes: 3 additions & 0 deletions filter/filter_impl/graceful_shutdown_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.I
return gf.getRejectHandler().RejectedExecution(invoker.GetURL(), invocation)
}
atomic.AddInt32(&gf.activeCount, 1)
if gf.shutdownConfig != nil && gf.activeCount > 0 {
gf.shutdownConfig.RequestsFinished = false
}
return invoker.Invoke(ctx, invocation)
}

Expand Down
8 changes: 3 additions & 5 deletions registry/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,11 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL {

// Destroy registry protocol
func (proto *registryProtocol) Destroy() {
for _, ivk := range proto.invokers {
ivk.Destroy()
}
// invoker.Destroy() should be performed in config.destroyConsumerProtocols().
proto.invokers = []protocol.Invoker{}
proto.bounds.Range(func(key, value interface{}) bool {
exporter := value.(protocol.Exporter)
exporter.Unexport()
// protocol holds the exporters actually, instead, registry holds them in order to avoid export repeatedly, so
// the work for unexport should be finished in protocol.Unexport(), see also config.destroyProviderProtocols().
proto.bounds.Delete(key)
return true
})
Expand Down
1 change: 1 addition & 0 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error)

// CloseAndNilClient closes listeners and clear client
func (r *zkRegistry) CloseAndNilClient() {
r.listener.Close()
r.client.Close()
r.client = nil
}
Expand Down
2 changes: 1 addition & 1 deletion registry/zookeeper/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (zksd *zookeeperServiceDiscovery) String() string {

// Close client be closed
func (zksd *zookeeperServiceDiscovery) Destroy() error {
zksd.client.Close()
zksd.csd.Close()
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions remoting/zookeeper/curator_discovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,12 @@ func (sd *ServiceDiscovery) pathForInstance(name, id string) string {
func (sd *ServiceDiscovery) pathForName(name string) string {
return path.Join(sd.basePath, name)
}

func (sd *ServiceDiscovery) Close() {
if sd.listener != nil {
sd.listener.Close()
}
if sd.client != nil {
sd.client.Close()
}
}

0 comments on commit 7fedc22

Please sign in to comment.