From a5cc7fe1de232ff6de54ef4a59bd514e220a7958 Mon Sep 17 00:00:00 2001 From: naonao2323 <74669884+naonao2323@users.noreply.github.com> Date: Mon, 2 Sep 2024 18:07:37 +0900 Subject: [PATCH] refactor: flushAll method to wait for all log flushes to complete (#5162) Signed-off-by: naonao2323 --- pkg/app/piped/logpersister/persister.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/app/piped/logpersister/persister.go b/pkg/app/piped/logpersister/persister.go index cd313a0370..b9b54e42fc 100644 --- a/pkg/app/piped/logpersister/persister.go +++ b/pkg/app/piped/logpersister/persister.go @@ -24,6 +24,7 @@ import ( "time" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice" @@ -157,17 +158,26 @@ func (p *persister) flush(ctx context.Context) (flushes, deletes int) { } func (p *persister) flushAll(ctx context.Context) int { + group, ctx := errgroup.WithContext(ctx) var num = 0 p.stagePersisters.Range(func(_, v interface{}) bool { sp := v.(*stageLogPersister) if !sp.isStale(p.stalePeriod) { + group.Go(func() error { + return sp.flushFromLastCheckpoint(ctx) + }) num++ - go sp.flushFromLastCheckpoint(ctx) } return true }) - + if err := group.Wait(); err != nil { + p.logger.Error( + "failed to flush all stage persisters", + zap.Error(err), + ) + return num + } p.logger.Info(fmt.Sprintf("flushing all of %d stage persisters", num)) return num }