From bd0da229d85872969d9a5bd16943eacdc9fac1b2 Mon Sep 17 00:00:00 2001 From: Berger Eugene Date: Sat, 8 Jun 2024 18:04:36 +0300 Subject: [PATCH] Issue #191: Fix logs channel race condition --- issues/issue_191/process-compose.yaml | 4 ++++ src/api/ws_api.go | 34 ++++++++++++++++++--------- src/client/client.go | 3 ++- src/client/logs.go | 22 ++++------------- src/cmd/logs.go | 6 +++-- 5 files changed, 37 insertions(+), 32 deletions(-) create mode 100644 issues/issue_191/process-compose.yaml diff --git a/issues/issue_191/process-compose.yaml b/issues/issue_191/process-compose.yaml new file mode 100644 index 0000000..bb697cf --- /dev/null +++ b/issues/issue_191/process-compose.yaml @@ -0,0 +1,4 @@ +version: "0.5" +processes: + noisy: + command: while true; do date; done diff --git a/src/api/ws_api.go b/src/api/ws_api.go index c21b996..bfd5e96 100644 --- a/src/api/ws_api.go +++ b/src/api/ws_api.go @@ -7,6 +7,7 @@ import ( "github.com/rs/zerolog/log" "net/http" "strconv" + "sync" ) var upgrader = websocket.Upgrader{} @@ -27,23 +28,34 @@ func (api *PcApi) HandleLogsStream(c *gin.Context) { done := make(chan struct{}) logChan := make(chan LogMessage, 256) - connector := pclog.NewConnector(func(messages []string) { - for _, message := range messages { - msg := LogMessage{ - Message: message, - ProcessName: procName, + chanCloseMtx := &sync.Mutex{} + isChannelClosed := false + connector := pclog.NewConnector( + func(messages []string) { + for _, message := range messages { + msg := LogMessage{ + Message: message, + ProcessName: procName, + } + logChan <- msg } - logChan <- msg - } - if !follow { - close(logChan) - } - }, + if !follow { + chanCloseMtx.Lock() + defer chanCloseMtx.Unlock() + close(logChan) + isChannelClosed = true + } + }, func(message string) (n int, err error) { msg := LogMessage{ Message: message, ProcessName: procName, } + chanCloseMtx.Lock() + defer chanCloseMtx.Unlock() + if isChannelClosed { + return 0, nil + } logChan <- msg return len(message), nil }, diff --git a/src/client/client.go b/src/client/client.go index 39e9ff0..0f6bab5 100644 --- a/src/client/client.go +++ b/src/client/client.go @@ -74,7 +74,8 @@ func (p *PcClient) GetLogLength() int { } func (p *PcClient) GetLogsAndSubscribe(name string, observer pclog.LogObserver) error { - return p.logger.ReadProcessLogs(name, p.logLength, true, observer) + _, err := p.logger.ReadProcessLogs(name, p.logLength, true, observer) + return err } func (p *PcClient) UnSubscribeLogger(name string, observer pclog.LogObserver) error { diff --git a/src/client/logs.go b/src/client/logs.go index a870a3a..438df22 100644 --- a/src/client/logs.go +++ b/src/client/logs.go @@ -27,7 +27,7 @@ func NewLogClient(address, socketPath string) *LogClient { } } -func (l *LogClient) ReadProcessLogs(name string, offset int, follow bool, out io.StringWriter) (err error) { +func (l *LogClient) ReadProcessLogs(name string, offset int, follow bool, out io.StringWriter) (done chan struct{}, err error) { url := fmt.Sprintf("ws://%s/process/logs/ws?name=%s&offset=%d&follow=%v", l.address, name, offset, follow) log.Info().Msgf("Connecting to %s", url) @@ -42,28 +42,14 @@ func (l *LogClient) ReadProcessLogs(name string, offset int, follow bool, out io if err != nil { log.Error().Msgf("failed to dial to %s error: %v", url, err) - return err + return done, err } //defer l.ws.Close() - done := make(chan struct{}) + done = make(chan struct{}) go l.readLogs(done, l.ws, follow, out) - /*for { - select { - case <-done: - return nil - case <-interrupt: - fmt.Println("interrupt") - - select { - case <-done: - case <-time.After(time.Second): - } - return nil - } - }*/ - return nil + return done, nil } // CloseChannel Cleanly close the connection by sending a close message and then diff --git a/src/cmd/logs.go b/src/cmd/logs.go index a580ace..82df1e9 100644 --- a/src/cmd/logs.go +++ b/src/cmd/logs.go @@ -19,7 +19,7 @@ var logsCmd = &cobra.Command{ name := args[0] logger := getLogClient() - err := logger.ReadProcessLogs(name, *pcFlags.LogTailLength, *pcFlags.LogFollow, os.Stdout) + done, err := logger.ReadProcessLogs(name, *pcFlags.LogTailLength, *pcFlags.LogFollow, os.Stdout) if err != nil { log.Fatal().Err(err).Msgf("Failed to fetch logs for process %s", name) } @@ -27,7 +27,9 @@ var logsCmd = &cobra.Command{ signal.Notify(interrupt, os.Interrupt) select { case <-interrupt: - fmt.Println("interrupt") + _ = logger.CloseChannel() + time.Sleep(time.Second) + case <-done: _ = logger.CloseChannel() time.Sleep(time.Second) }