Skip to content

Commit

Permalink
fix: service group not working well when callback takes long time
Browse files Browse the repository at this point in the history
Signed-off-by: kevin <wanjunfeng@gmail.com>
  • Loading branch information
kevwan committed Dec 30, 2024
1 parent 5c3679f commit 1041b50
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 1 deletion.
3 changes: 3 additions & 0 deletions core/proc/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ func (lm *listenerManager) addListener(fn func()) (waitForCalled func()) {
})
lm.lock.Unlock()

// we can return lm.waitGroup.Wait directly,
// but we want to make the returned func more readable.
// creating an extra closure would be negligible in practice.
return func() {
lm.waitGroup.Wait()
}
Expand Down
37 changes: 37 additions & 0 deletions core/proc/shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package proc

import (
"sync/atomic"
"testing"
"time"

Expand All @@ -29,6 +30,42 @@ func TestShutdown(t *testing.T) {
assert.Equal(t, 3, val)
}

func TestShutdownWithMultipleServices(t *testing.T) {
SetTimeToForceQuit(time.Hour)
assert.Equal(t, time.Hour, delayTimeBeforeForceQuit)

var val int32
called1 := AddShutdownListener(func() {
atomic.AddInt32(&val, 1)
})
called2 := AddShutdownListener(func() {
atomic.AddInt32(&val, 2)
})
Shutdown()
called1()
called2()

assert.Equal(t, int32(3), atomic.LoadInt32(&val))
}

func TestWrapUpWithMultipleServices(t *testing.T) {
SetTimeToForceQuit(time.Hour)
assert.Equal(t, time.Hour, delayTimeBeforeForceQuit)

var val int32
called1 := AddWrapUpListener(func() {
atomic.AddInt32(&val, 1)
})
called2 := AddWrapUpListener(func() {
atomic.AddInt32(&val, 2)
})
WrapUp()
called1()
called2()

assert.Equal(t, int32(3), atomic.LoadInt32(&val))
}

func TestNotifyMoreThanOnce(t *testing.T) {
ch := make(chan struct{}, 1)

Expand Down
7 changes: 6 additions & 1 deletion core/service/servicegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,14 @@ func (sg *ServiceGroup) doStart() {
}

func (sg *ServiceGroup) doStop() {
group := threading.NewRoutineGroup()
for _, service := range sg.services {
service.Stop()
// new variable to avoid closure problems, can be removed after go 1.22
// see https://golang.org/doc/faq#closures_and_goroutines
service := service
group.Run(service.Stop)
}
group.Wait()
}

// WithStart wraps a start func as a Service.
Expand Down
17 changes: 17 additions & 0 deletions gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/jhump/protoreflect/grpcreflect"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mr"
"github.com/zeromicro/go-zero/core/threading"
"github.com/zeromicro/go-zero/gateway/internal"
"github.com/zeromicro/go-zero/rest"
"github.com/zeromicro/go-zero/rest/httpx"
Expand All @@ -23,6 +24,7 @@ type (
Server struct {
*rest.Server
upstreams []Upstream
conns []zrpc.Client
processHeader func(http.Header) []string
dialer func(conf zrpc.RpcClientConf) zrpc.Client
}
Expand Down Expand Up @@ -52,6 +54,20 @@ func (s *Server) Start() {

// Stop stops the gateway server.
func (s *Server) Stop() {
group := threading.NewRoutineGroup()

for _, conn := range s.conns {
// new variable to avoid closure problems, can be removed after go 1.22
// see https://golang.org/doc/faq#closures_and_goroutines
conn := conn
group.Run(func() {
if err := conn.Conn().Close(); err != nil {
logx.Error(err)
}

Check warning on line 66 in gateway/server.go

View check run for this annotation

Codecov / codecov/patch

gateway/server.go#L65-L66

Added lines #L65 - L66 were not covered by tests
})
}

group.Wait()
s.Server.Stop()
}

Expand All @@ -71,6 +87,7 @@ func (s *Server) build() error {
} else {
cli = zrpc.MustNewClient(up.Grpc)
}
s.conns = append(s.conns, cli)

source, err := s.createDescriptorSource(cli, up)
if err != nil {
Expand Down

0 comments on commit 1041b50

Please sign in to comment.