From e953cbfd327e4f8a864ac7b0a03b399d67db560d Mon Sep 17 00:00:00 2001 From: Valentin Rothberg Date: Fri, 2 Dec 2022 12:51:30 +0100 Subject: [PATCH] notifyproxy: handle barrier messages Does not fully fix #16515 as the BARRIER=1 message can, in theory, occure in a separate subsequent message that will not be read as the proxies return/stop when reading the READY=1 message. [NO NEW TESTS NEEDED] - existing tests are expected to pass and #16076 should (finally) stop flaking. Fixes: #16076 Signed-off-by: Valentin Rothberg --- pkg/systemd/notifyproxy/notifyproxy.go | 89 ++++++++++++++++++++------ 1 file changed, 69 insertions(+), 20 deletions(-) diff --git a/pkg/systemd/notifyproxy/notifyproxy.go b/pkg/systemd/notifyproxy/notifyproxy.go index b5010cbc85e5..d8cc51a42c69 100644 --- a/pkg/systemd/notifyproxy/notifyproxy.go +++ b/pkg/systemd/notifyproxy/notifyproxy.go @@ -14,6 +14,16 @@ import ( "github.com/containers/podman/v4/libpod/define" "github.com/coreos/go-systemd/v22/daemon" "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" +) + +const ( // Values from systemd. + // See https://github.com/containers/podman/issues/16515 for a discription of the protocol. + _notifyRcvbufSize = 8 * 1024 * 1024 + _notifyBufferMax = 4096 + _notifyFdMax = 768 + _notifyBarrierMsg = "BARRIER=1" + _notifyRdyMsg = daemon.SdNotifyReady ) // SendMessage sends the specified message to the specified socket. @@ -76,6 +86,10 @@ func New(tmpDir string) (*NotifyProxy, error) { return nil, err } + if err := conn.SetReadBuffer(_notifyRcvbufSize); err != nil { + return nil, fmt.Errorf("setting read buffer: %w", err) + } + errorChan := make(chan error, 1) readyChan := make(chan bool, 1) @@ -100,34 +114,69 @@ func (p *NotifyProxy) waitForReady() { go func() { // Read until the `READY` message is received or the connection // is closed. - const bufferSize = 1024 + + fdSize := unix.CmsgSpace(4) sBuilder := strings.Builder{} for { - for { - buffer := make([]byte, bufferSize) - num, err := p.connection.Read(buffer) - if err != nil { - if !errors.Is(err, io.EOF) { - p.errorChan <- err - return - } - } - sBuilder.Write(buffer[:num]) - if num != bufferSize || buffer[num-1] == '\n' { - // Break as we read an entire line that - // we can inspect for the `READY` - // message. - break + buffer := make([]byte, _notifyBufferMax) + oob := make([]byte, _notifyFdMax*fdSize) + + n, oobn, flags, _, err := p.connection.ReadMsgUnix(buffer, oob) + if err != nil { + if !errors.Is(err, io.EOF) { + p.errorChan <- err + return } + logrus.Errorf("Error reading unix message on socket %q: %v", p.socketPath, err) + } + + if n > _notifyBufferMax || oobn > _notifyFdMax*fdSize { + logrus.Errorf("Ignoring unix message on socket %q: incorrect number of bytes read (n=%d, oobn=%d)", p.socketPath, n, oobn) + continue } + if flags&unix.MSG_CTRUNC != 0 { + logrus.Errorf("Ignoring unix message on socket %q: message truncated", p.socketPath) + continue + } + + sBuilder.Reset() + sBuilder.Write(buffer[:n]) + var isBarrier, isReady bool + for _, line := range strings.Split(sBuilder.String(), "\n") { - if line == daemon.SdNotifyReady { - p.readyChan <- true - return + switch line { + case _notifyRdyMsg: + isReady = true + case _notifyBarrierMsg: + isBarrier = true } } - sBuilder.Reset() + + if isBarrier { + scms, err := unix.ParseSocketControlMessage(oob) + if err != nil { + logrus.Errorf("parsing control message on socket %q: %v", p.socketPath, err) + } + for _, scm := range scms { + fds, err := unix.ParseUnixRights(&scm) + if err != nil { + logrus.Errorf("parsing unix rights of control message on socket %q: %v", p.socketPath, err) + continue + } + for _, fd := range fds { + if err := unix.Close(fd); err != nil { + logrus.Errorf("closing fd passed on socket %q: %v", fd, err) + continue + } + } + } + } + + if isReady { + p.readyChan <- true + return + } } }() }