Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: close SMTP submission correctly to handle errors #4006

Merged
merged 3 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/coder/quartz v0.1.0
github.com/emersion/go-smtp v0.21.3
github.com/go-kit/log v0.2.1
github.com/go-openapi/analysis v0.23.0
github.com/go-openapi/errors v0.22.0
Expand Down Expand Up @@ -62,6 +63,7 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21 h1:OJyUGMJTzHTd1XQp98QTaHernxMYzRaOasRir9hUlFQ=
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ=
github.com/emersion/go-smtp v0.21.3 h1:7uVwagE8iPYE48WhNsng3RRpCUpFvNl39JGNSIyGVMY=
github.com/emersion/go-smtp v0.21.3/go.mod h1:qm27SGYgoIPRot6ubfQ/GpiPy/g3PaZAVRxiO/sDUgQ=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
16 changes: 15 additions & 1 deletion notify/email/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"net/textproto"
"os"
"strings"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -242,7 +243,15 @@ func (n *Email) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
if err != nil {
return true, fmt.Errorf("send DATA command: %w", err)
}
defer message.Close()
closeOnce := sync.OnceValue(func() error {
return message.Close()
})
// Close the message when this method exits in order to not leak resources. Even though we're calling this explicitly
// further down, the method may exit before then.
defer func() {
// If we try close an already-closed writer, it'll send a subsequent request to the server which is invalid.
_ = closeOnce()
}()

buffer := &bytes.Buffer{}
for header, t := range n.conf.Headers {
Expand Down Expand Up @@ -331,6 +340,11 @@ func (n *Email) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
return false, fmt.Errorf("write body buffer: %w", err)
}

// Complete the message and await response.
if err = closeOnce(); err != nil {
return true, fmt.Errorf("delivery failure: %w", err)
}

success = true
return false, nil
}
Expand Down
177 changes: 161 additions & 16 deletions notify/email/email_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,22 @@ import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/emersion/go-smtp"
"github.com/go-kit/log"
commoncfg "github.com/prometheus/common/config"
"github.com/prometheus/common/model"

// nolint:depguard // require cannot be called outside the main goroutine: https://pkg.go.dev/testing#T.FailNow
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -165,29 +171,16 @@ func notifyEmail(cfg *config.EmailConfig, server *mailDev) (*email, bool, error)
// notifyEmailWithContext sends a notification with one firing alert and retrieves the
// email from the SMTP server if the notification has been successfully delivered.
func notifyEmailWithContext(ctx context.Context, cfg *config.EmailConfig, server *mailDev) (*email, bool, error) {
if cfg.RequireTLS == nil {
cfg.RequireTLS = new(bool)
}
if cfg.Headers == nil {
cfg.Headers = make(map[string]string)
}
firingAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{},
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
},
}
err := server.deleteAllEmails()
tmpl, firingAlert, err := prepare(cfg)
if err != nil {
return nil, false, err
}

tmpl, err := template.FromGlobs([]string{})
err = server.deleteAllEmails()
if err != nil {
return nil, false, err
}
tmpl.ExternalURL, _ = url.Parse("http://am")

email := New(cfg, tmpl, log.NewNopLogger())

retry, err := email.Notify(ctx, firingAlert)
Expand All @@ -204,6 +197,34 @@ func notifyEmailWithContext(ctx context.Context, cfg *config.EmailConfig, server
return e, retry, nil
}

func prepare(cfg *config.EmailConfig) (*template.Template, *types.Alert, error) {
if cfg == nil {
panic("nil config passed")
}

if cfg.RequireTLS == nil {
cfg.RequireTLS = new(bool)
}
if cfg.Headers == nil {
cfg.Headers = make(map[string]string)
}

tmpl, err := template.FromGlobs([]string{})
if err != nil {
return nil, nil, err
}
tmpl.ExternalURL, _ = url.Parse("http://am")

firingAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{},
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
},
}
return tmpl, firingAlert, nil
}

// TestEmailNotifyWithErrors tries to send emails with buggy inputs.
func TestEmailNotifyWithErrors(t *testing.T) {
cfgFile := os.Getenv(emailNoAuthConfigVar)
Expand Down Expand Up @@ -643,3 +664,127 @@ func TestEmailNoUsernameStillOk(t *testing.T) {
require.NoError(t, err)
require.Nil(t, a)
}

// TestEmailRejected simulates the failure of an otherwise valid message submission which fails at a later point than
// was previously expected by the code.
func TestEmailRejected(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
t.Cleanup(cancel)

// Setup mock SMTP server which will reject at the DATA stage.
srv, l, err := mockSMTPServer(t)
require.NoError(t, err)
t.Cleanup(func() {
// We expect that the server has already been closed in the test.
require.ErrorIs(t, srv.Shutdown(ctx), smtp.ErrServerClosed)
})

done := make(chan any, 1)
go func() {
// nolint:testifylint // require cannot be called outside the main goroutine: https://pkg.go.dev/testing#T.FailNow
assert.NoError(t, srv.Serve(l))
close(done)
}()

// Wait for mock SMTP server to become ready.
require.Eventuallyf(t, func() bool {
c, err := smtp.Dial(srv.Addr)
if err != nil {
t.Logf("dial failed to %q: %s", srv.Addr, err)
return false
}

// Ping.
if err = c.Noop(); err != nil {
t.Logf("ping failed to %q: %s", srv.Addr, err)
return false
}

// Ensure we close the connection to not prevent server from shutting down cleanly.
if err = c.Close(); err != nil {
t.Logf("close failed to %q: %s", srv.Addr, err)
return false
}

return true
}, time.Second*10, time.Millisecond*100, "mock SMTP server failed to start")

// Use mock SMTP server and prepare alert to be sent.
require.IsType(t, &net.TCPAddr{}, l.Addr())
addr := l.Addr().(*net.TCPAddr)
cfg := &config.EmailConfig{
Smarthost: config.HostPort{Host: addr.IP.String(), Port: strconv.Itoa(addr.Port)},
Hello: "localhost",
Headers: make(map[string]string),
From: "alertmanager@system",
To: "sre@company",
}
tmpl, firingAlert, err := prepare(cfg)
require.NoError(t, err)

e := New(cfg, tmpl, log.NewNopLogger())

// Send the alert to mock SMTP server.
retry, err := e.Notify(context.Background(), firingAlert)
require.ErrorContains(t, err, "501 5.5.4 Rejected!")
require.True(t, retry)
require.NoError(t, srv.Shutdown(ctx))

require.Eventuallyf(t, func() bool {
<-done
return true
}, time.Second*10, time.Millisecond*100, "mock SMTP server goroutine failed to close in time")
}

func mockSMTPServer(t *testing.T) (*smtp.Server, net.Listener, error) {
t.Helper()

// Listen on the next available high port.
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, nil, fmt.Errorf("connect: %w", err)
}

addr, ok := l.Addr().(*net.TCPAddr)
if !ok {
return nil, nil, fmt.Errorf("unexpected address type: %T", l.Addr())
}

s := smtp.NewServer(&rejectingBackend{})
s.Addr = addr.String()
s.WriteTimeout = 10 * time.Second
s.ReadTimeout = 10 * time.Second

return s, l, nil
}

// rejectingBackend will reject submission at the DATA stage.
type rejectingBackend struct{}

func (b *rejectingBackend) NewSession(c *smtp.Conn) (smtp.Session, error) {
return &mockSMTPSession{
conn: c,
backend: b,
}, nil
}

type mockSMTPSession struct {
conn *smtp.Conn
backend smtp.Backend
}

func (s *mockSMTPSession) Mail(string, *smtp.MailOptions) error {
return nil
}

func (s *mockSMTPSession) Rcpt(string, *smtp.RcptOptions) error {
return nil
}

func (s *mockSMTPSession) Data(io.Reader) error {
return &smtp.SMTPError{Code: 501, EnhancedCode: smtp.EnhancedCode{5, 5, 4}, Message: "Rejected!"}
}

func (*mockSMTPSession) Reset() {}

func (*mockSMTPSession) Logout() error { return nil }
Loading