Skip to content

Commit

Permalink
notifyproxy: handle barrier messages
Browse files Browse the repository at this point in the history
Does not fully fix containers#16515 as the BARRIER=1 message can, in theory,
occure in a separate subsequent message that will not be read as
the proxies return/stop when reading the READY=1 message.

[NO NEW TESTS NEEDED] - existing tests are expected to pass and containers#16076
should (finally) stop flaking.:

Fixes: containers#16076
Signed-off-by: Valentin Rothberg <vrothberg@redhat.com>
  • Loading branch information
vrothberg committed Dec 2, 2022
1 parent 51deb32 commit 9d88398
Showing 1 changed file with 68 additions and 20 deletions.
88 changes: 68 additions & 20 deletions pkg/systemd/notifyproxy/notifyproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ import (
"github.com/containers/podman/v4/libpod/define"
"github.com/coreos/go-systemd/v22/daemon"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)

const ( // Values from systemd.
// See https://github.com/containers/podman/issues/16515 for a discription of the protocol.
_notifyRcvbufSize = 8 * 1024 * 1024
_notifyBufferMax = 4096
_notifyFdMax = 768
_notifyBarrierMsg = "BARRIER=1"
_notifyRdyMsg = daemon.SdNotifyReady
)

// SendMessage sends the specified message to the specified socket.
Expand Down Expand Up @@ -76,6 +86,10 @@ func New(tmpDir string) (*NotifyProxy, error) {
return nil, err
}

if err := conn.SetReadBuffer(_notifyRcvbufSize); err != nil {
return nil, fmt.Errorf("setting read buffer: %w", err)
}

errorChan := make(chan error, 1)
readyChan := make(chan bool, 1)

Expand All @@ -100,34 +114,68 @@ func (p *NotifyProxy) waitForReady() {
go func() {
// Read until the `READY` message is received or the connection
// is closed.
const bufferSize = 1024

fdSize := unix.CmsgSpace(4)
buffer := make([]byte, _notifyBufferMax)
oob := make([]byte, _notifyFdMax*fdSize)
sBuilder := strings.Builder{}
for {
for {
buffer := make([]byte, bufferSize)
num, err := p.connection.Read(buffer)
if err != nil {
if !errors.Is(err, io.EOF) {
p.errorChan <- err
return
}
}
sBuilder.Write(buffer[:num])
if num != bufferSize || buffer[num-1] == '\n' {
// Break as we read an entire line that
// we can inspect for the `READY`
// message.
break
n, oobn, flags, _, err := p.connection.ReadMsgUnix(buffer, oob)
if err != nil {
if !errors.Is(err, io.EOF) {
p.errorChan <- err
return
}
logrus.Errorf("Error reading unix message on socket %q: %v", p.socketPath, err)
}

if n > _notifyBufferMax || oobn > _notifyFdMax*fdSize {
logrus.Errorf("Ignoring unix message on socket %q: incorrect number of bytes read (n=%d, oobn=%d)", p.socketPath, n, oobn)
continue
}

if flags&unix.MSG_CTRUNC != 0 {
logrus.Errorf("Ignoring unix message on socket %q: message truncated", p.socketPath)
continue
}

sBuilder.Reset()
sBuilder.Write(buffer[:n])
var isBarrier, isReady bool

for _, line := range strings.Split(sBuilder.String(), "\n") {
if line == daemon.SdNotifyReady {
p.readyChan <- true
return
switch line {
case _notifyRdyMsg:
isReady = true
case _notifyBarrierMsg:
isBarrier = true
}
}
sBuilder.Reset()

if isBarrier {
scms, err := unix.ParseSocketControlMessage(oob)
if err != nil {
logrus.Errorf("parsing control message on socket %q: %v", p.socketPath, err)
}
for _, scm := range scms {
fds, err := unix.ParseUnixRights(&scm)
if err != nil {
logrus.Errorf("parsing unix rights of control message on socket %q: %v", p.socketPath, err)
continue
}
for _, fd := range fds {
if err := unix.Close(fd); err != nil {
logrus.Errorf("closing fd passed on socket %q: %v", fd, err)
continue
}
}
}
}

if isReady {
p.readyChan <- true
return
}
}
}()
}
Expand Down

0 comments on commit 9d88398

Please sign in to comment.