From 5eca357c4a18562f6a256fbb891a8977a4124e68 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 2 Dec 2019 14:01:41 +0100 Subject: [PATCH 01/10] *: support authentication and TLS for Alertmanager This change adds support for authentication with basic auth, client certificates and bearer tokens. It also enables to configure TLS settings for the Alertmanager endpoints. Most of the work leverages the existing Prometheus configuration format and code. In particular TLS certificate files are automatically reloaded whenever they change. Signed-off-by: Simon Pasquier --- cmd/thanos/rule.go | 185 ++++++++------------ docs/components/rule.md | 50 +++++- docs/service-discovery.md | 6 +- pkg/alert/alert.go | 108 ++++-------- pkg/alert/alert_test.go | 124 ++++++------- pkg/alert/client.go | 360 ++++++++++++++++++++++++++++++++++++++ pkg/alert/client_test.go | 84 +++++++++ scripts/cfggen/main.go | 24 +-- 8 files changed, 675 insertions(+), 266 deletions(-) create mode 100644 pkg/alert/client.go create mode 100644 pkg/alert/client_test.go diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 115be52ff8..6b451e72b2 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "math/rand" - "net" "net/http" "net/url" "os" @@ -13,7 +12,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "syscall" "time" @@ -83,8 +81,8 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { alertmgrs := cmd.Flag("alertmanagers.url", "Alertmanager replica URLs to push firing alerts. Ruler claims success if push to at least one alertmanager from discovered succeeds. The scheme should not be empty e.g `http` might be used. The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect Alertmanager IPs through respective DNS lookups. The port defaults to 9093 or the SRV record's value. The URL path is used as a prefix for the regular Alertmanager API path."). Strings() - - alertmgrsTimeout := cmd.Flag("alertmanagers.send-timeout", "Timeout for sending alerts to alertmanager").Default("10s").Duration() + alertmgrsTimeout := cmd.Flag("alertmanagers.send-timeout", "Timeout for sending alerts to Alertmanager").Default("10s").Duration() + alertmgrsConfig := extflag.RegisterPathOrContent(cmd, "alertmanagers.config", "YAML file that contains alerting configuration. See format details: https://thanos.io/components/rule.md/#configuration. If defined, it takes precedence over the '--alertmanagers.url' and '--alertmanagers.send-timeout' flags.", false) alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field").String() @@ -157,6 +155,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { lset, *alertmgrs, *alertmgrsTimeout, + alertmgrsConfig, *grpcBindAddr, time.Duration(*grpcGracePeriod), *grpcCert, @@ -194,6 +193,7 @@ func runRule( lset labels.Labels, alertmgrURLs []string, alertmgrsTimeout time.Duration, + alertmgrsConfig *extflag.PathOrContent, grpcBindAddr string, grpcGracePeriod time.Duration, grpcCert string, @@ -286,11 +286,48 @@ func runRule( dns.ResolverType(dnsSDResolver), ) + // Build the Alertmanager clients. + alertmgrsConfigYAML, err := alertmgrsConfig.Content() + if err != nil { + return err + } + var ( + alertingcfg alert.AlertingConfig + alertmgrs []*alert.Alertmanager + ) + if len(alertmgrsConfigYAML) > 0 { + if len(alertmgrURLs) != 0 { + level.Warn(logger).Log("msg", "ignoring --alertmanagers.url flag because --alertmanagers.config* flag is also defined") + } + alertingcfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML) + if err != nil { + return err + } + } else { + // Build the Alertmanager configuration from the legacy flags. + for _, addr := range alertmgrURLs { + cfg, err := alert.BuildAlertmanagerConfig(logger, addr, alertmgrsTimeout) + if err != nil { + return err + } + alertingcfg.Alertmanagers = append(alertingcfg.Alertmanagers, cfg) + } + } + if len(alertingcfg.Alertmanagers) == 0 { + level.Warn(logger).Log("msg", "no alertmanager configured") + } + for _, cfg := range alertingcfg.Alertmanagers { + am, err := alert.NewAlertmanager(logger, cfg) + if err != nil { + return err + } + alertmgrs = append(alertmgrs, am) + } + // Run rule evaluation and alert notifications. var ( - alertmgrs = newAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver)) - alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) - ruleMgr = thanosrule.NewManager(dataDir) + alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) + ruleMgr = thanosrule.NewManager(dataDir) ) { notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { @@ -351,9 +388,40 @@ func runRule( }) } } + // Discover and resolve Alertmanager addresses. { - // TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660. - sdr := alert.NewSender(logger, reg, alertmgrs.get, nil, alertmgrsTimeout) + resolver := dns.NewResolver(dns.ResolverType(dnsSDResolver).ToResolver(logger)) + + for i := range alertmgrs { + am := alertmgrs[i] + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + am.Discover(ctx) + return nil + }, func(error) { + cancel() + }) + + g.Add(func() error { + return runutil.Repeat(30*time.Second, ctx.Done(), func() error { + if err := am.Update(ctx, resolver); err != nil { + level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err) + alertMngrAddrResolutionErrors.Inc() + } + return nil + }) + }, func(error) { + cancel() + }) + } + } + // Run the alert sender. + { + doers := make([]alert.AlertmanagerDoer, len(alertmgrs)) + for i := range alertmgrs { + doers[i] = alertmgrs[i] + } + sdr := alert.NewSender(logger, reg, doers) ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { @@ -370,21 +438,6 @@ func runRule( cancel() }) } - { - ctx, cancel := context.WithCancel(context.Background()) - - g.Add(func() error { - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { - if err := alertmgrs.update(ctx); err != nil { - level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err) - alertMngrAddrResolutionErrors.Inc() - } - return nil - }) - }, func(error) { - cancel() - }) - } // Run File Service Discovery and update the query addresses when the files are modified. if fileSD != nil { var fileSDUpdates chan []*targetgroup.Group @@ -615,90 +668,6 @@ func runRule( return nil } -type alertmanagerSet struct { - resolver dns.Resolver - addrs []string - mtx sync.Mutex - current []*url.URL -} - -func newAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet { - return &alertmanagerSet{ - resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)), - addrs: addrs, - } -} - -func (s *alertmanagerSet) get() []*url.URL { - s.mtx.Lock() - defer s.mtx.Unlock() - return s.current -} - -const defaultAlertmanagerPort = 9093 - -func parseAlertmanagerAddress(addr string) (qType dns.QType, parsedUrl *url.URL, err error) { - qType = "" - parsedUrl, err = url.Parse(addr) - if err != nil { - return qType, nil, err - } - // The Scheme might contain DNS resolver type separated by + so we split it a part. - if schemeParts := strings.Split(parsedUrl.Scheme, "+"); len(schemeParts) > 1 { - parsedUrl.Scheme = schemeParts[len(schemeParts)-1] - qType = dns.QType(strings.Join(schemeParts[:len(schemeParts)-1], "+")) - } - return qType, parsedUrl, err -} - -func (s *alertmanagerSet) update(ctx context.Context) error { - var result []*url.URL - for _, addr := range s.addrs { - var ( - qtype dns.QType - resolvedDomain []string - ) - - qtype, u, err := parseAlertmanagerAddress(addr) - if err != nil { - return errors.Wrapf(err, "parse URL %q", addr) - } - - // Get only the host and resolve it if needed. - host := u.Host - if qtype != "" { - if qtype == dns.A { - _, _, err = net.SplitHostPort(host) - if err != nil { - // The host could be missing a port. Append the defaultAlertmanagerPort. - host = host + ":" + strconv.Itoa(defaultAlertmanagerPort) - } - } - resolvedDomain, err = s.resolver.Resolve(ctx, host, qtype) - if err != nil { - return errors.Wrap(err, "alertmanager resolve") - } - } else { - resolvedDomain = []string{host} - } - - for _, resolved := range resolvedDomain { - result = append(result, &url.URL{ - Scheme: u.Scheme, - Host: resolved, - Path: u.Path, - User: u.User, - }) - } - } - - s.mtx.Lock() - s.current = result - s.mtx.Unlock() - - return nil -} - func parseFlagLabels(s []string) (labels.Labels, error) { var lset labels.Labels for _, l := range s { diff --git a/docs/components/rule.md b/docs/components/rule.md index c82d82b8e7..7d6f46586a 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -209,7 +209,23 @@ Flags: record's value. The URL path is used as a prefix for the regular Alertmanager API path. --alertmanagers.send-timeout=10s - Timeout for sending alerts to alertmanager + Timeout for sending alerts to Alertmanager + --alertmanagers.config-file= + Path to YAML file that contains alerting + configuration. See format details: + https://thanos.io/components/rule.md/#configuration. + If defined, it takes precedence over the + '--alertmanagers.url' and + '--alertmanagers.send-timeout' flags. + --alertmanagers.config= + Alternative to 'alertmanagers.config-file' flag + (lower priority). Content of YAML file that + contains alerting configuration. See format + details: + https://thanos.io/components/rule.md/#configuration. + If defined, it takes precedence over the + '--alertmanagers.url' and + '--alertmanagers.send-timeout' flags. --alert.query-url=ALERT.QUERY-URL The external Thanos Query URL that would be set in all alerts 'Source' field @@ -266,3 +282,35 @@ Flags: Interval between DNS resolutions. ``` + +## Configuration + +### Alertmanager + +The configuration format supported by the `--alertmanagers.config` and `--alertmanagers.config-file` flags is the following: + +[embedmd]:# (../flags/config_rule_alerting.txt yaml) +```yaml +alertmanagers: +- http_config: + basic_auth: + username: "" + password: "" + password_file: "" + bearer_token: "" + bearer_token_file: "" + proxy_url: "" + tls_config: + ca_file: "" + cert_file: "" + key_file: "" + server_name: "" + insecure_skip_verify: false + static_configs: [] + file_sd_configs: + - files: [] + refresh_interval: 0s + scheme: http + path_prefix: "" + timeout: 10s +``` diff --git a/docs/service-discovery.md b/docs/service-discovery.md index cdb44e8493..0c76b733c0 100644 --- a/docs/service-discovery.md +++ b/docs/service-discovery.md @@ -13,7 +13,7 @@ SD is currently used in the following places within Thanos: * `Thanos Query` needs to know about [StoreAPI](https://github.com/thanos-io/thanos/blob/d3fb337da94d11c78151504b1fccb1d7e036f394/pkg/store/storepb/rpc.proto#L14) servers in order to query metrics from them. * `Thanos Rule` needs to know about `QueryAPI` servers in order to evaluate recording and alerting rules. -* `Thanos Rule` needs to know about `Alertmanagers` HA replicas in order to send alerts; only static option with DNS discovery. +* `Thanos Rule` needs to know about `Alertmanagers` HA replicas in order to send alerts. There are currently several ways to configure SD, described below in more detail: @@ -33,7 +33,7 @@ The repeatable flag `--store=` can be used to specify a `StoreAPI` that ` The repeatable flag `--query=` can be used to specify a `QueryAPI` that `Thanos Rule` should use. -The repeatable flag `--alertmanager.url=` can be used to specify a `Alertmanager API` that `Thanos Rule` should use. +The repeatable flag `--alertmanagers.url=` can be used to specify a `Alertmanager API` that `Thanos Rule` should use. ## File Service Discovery @@ -77,6 +77,8 @@ Again, the `` can be a glob pattern. The flag `--query.sd-interval=<5m>` can be used to change the fallback re-read interval. +`Thanos Rule` also supports the configuration of Alertmanager endpoints using YAML with the `--alertmanagers.config=` and `--alertmanagers.config-file=` flags. + ## DNS Service Discovery DNS Service Discovery is another mechanism for finding components that can be used in conjunction with Static Flags or File SD. diff --git a/pkg/alert/alert.go b/pkg/alert/alert.go index 6af16674e8..c443f71555 100644 --- a/pkg/alert/alert.go +++ b/pkg/alert/alert.go @@ -2,34 +2,21 @@ package alert import ( - "bytes" "context" "encoding/json" "fmt" - "net/http" "net/url" - "path" "sync" "sync/atomic" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/version" - "github.com/prometheus/prometheus/pkg/labels" - - "github.com/thanos-io/thanos/pkg/runutil" -) -const ( - alertPushEndpoint = "/api/v1/alerts" - contentTypeJSON = "application/json" + "github.com/prometheus/prometheus/pkg/labels" ) -var userAgent = fmt.Sprintf("Thanos/%s", version.Version) - // Alert is a generic representation of an alert in the Prometheus eco-system. type Alert struct { // Label value pairs for purpose of aggregation, matching, and disposition @@ -251,12 +238,15 @@ func (q *Queue) Push(alerts []*Alert) { } } +type AlertmanagerDoer interface { + Endpoints() []*url.URL + Do(context.Context, *url.URL, []byte) error +} + // Sender sends notifications to a dynamic set of alertmanagers. type Sender struct { logger log.Logger - alertmanagers func() []*url.URL - doReq func(req *http.Request) (*http.Response, error) - timeout time.Duration + alertmanagers []AlertmanagerDoer sent *prometheus.CounterVec errs *prometheus.CounterVec @@ -269,21 +259,14 @@ type Sender struct { func NewSender( logger log.Logger, reg prometheus.Registerer, - alertmanagers func() []*url.URL, - doReq func(req *http.Request) (*http.Response, error), - timeout time.Duration, + alertmanagers []AlertmanagerDoer, ) *Sender { - if doReq == nil { - doReq = http.DefaultClient.Do - } if logger == nil { logger = log.NewNopLogger() } s := &Sender{ logger: logger, alertmanagers: alertmanagers, - doReq: doReq, - timeout: timeout, sent: prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "thanos_alert_sender_alerts_sent_total", @@ -311,7 +294,7 @@ func NewSender( return s } -// Send an alert batch to all given Alertmanager URLs. +// Send an alert batch to all given Alertmanager client. // TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660. func (s *Sender) Send(ctx context.Context, alerts []*Alert) { if len(alerts) == 0 { @@ -327,33 +310,29 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) { wg sync.WaitGroup numSuccess uint64 ) - amrs := s.alertmanagers() - for _, u := range amrs { - amURL := *u - sendCtx, cancel := context.WithTimeout(ctx, s.timeout) - - wg.Add(1) - go func() { - defer wg.Done() - defer cancel() - - start := time.Now() - amURL.Path = path.Join(amURL.Path, alertPushEndpoint) - - if err := s.sendOne(sendCtx, amURL.String(), b); err != nil { - level.Warn(s.logger).Log( - "msg", "sending alerts failed", - "alertmanager", amURL.Host, - "numAlerts", len(alerts), - "err", err) - s.errs.WithLabelValues(amURL.Host).Inc() - return - } - s.latency.WithLabelValues(amURL.Host).Observe(time.Since(start).Seconds()) - s.sent.WithLabelValues(amURL.Host).Add(float64(len(alerts))) - - atomic.AddUint64(&numSuccess, 1) - }() + for _, amc := range s.alertmanagers { + for _, u := range amc.Endpoints() { + wg.Add(1) + go func(amc AlertmanagerDoer, u *url.URL) { + defer wg.Done() + + level.Debug(s.logger).Log("msg", "sending alerts", "alertmanager", u.Host, "numAlerts", len(alerts)) + start := time.Now() + if err := amc.Do(ctx, u, b); err != nil { + level.Warn(s.logger).Log( + "msg", "sending alerts failed", + "alertmanager", u.Host, + "numAlerts", len(alerts), + "err", err) + s.errs.WithLabelValues(u.Host).Inc() + return + } + s.latency.WithLabelValues(u.Host).Observe(time.Since(start).Seconds()) + s.sent.WithLabelValues(u.Host).Add(float64(len(alerts))) + + atomic.AddUint64(&numSuccess, 1) + }(amc, u) + } } wg.Wait() @@ -362,26 +341,5 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) { } s.dropped.Add(float64(len(alerts))) - level.Warn(s.logger).Log("msg", "failed to send alerts to all alertmanagers", "alertmanagers", amrs, "alerts", string(b)) -} - -func (s *Sender) sendOne(ctx context.Context, url string, b []byte) error { - req, err := http.NewRequest("POST", url, bytes.NewReader(b)) - if err != nil { - return err - } - req = req.WithContext(ctx) - req.Header.Set("Content-Type", contentTypeJSON) - req.Header.Set("User-Agent", userAgent) - - resp, err := s.doReq(req) - if err != nil { - return errors.Wrapf(err, "send request to %q", url) - } - defer runutil.ExhaustCloseWithLogOnErr(s.logger, resp.Body, "send one alert") - - if resp.StatusCode/100 != 2 { - return errors.Errorf("bad response status %v from %q", resp.Status, url) - } - return nil + level.Warn(s.logger).Log("msg", "failed to send alerts to all alertmanagers", "alerts", string(b)) } diff --git a/pkg/alert/alert_test.go b/pkg/alert/alert_test.go index c2fcc8f91a..8797d9f8f2 100644 --- a/pkg/alert/alert_test.go +++ b/pkg/alert/alert_test.go @@ -1,14 +1,10 @@ package alert import ( - "bytes" "context" - "io/ioutil" - "net/http" "net/url" "sync" "testing" - "time" "github.com/prometheus/prometheus/pkg/labels" @@ -50,98 +46,86 @@ func assertSameHosts(t *testing.T, expected []*url.URL, found []*url.URL) { } } -func TestSender_Send_OK(t *testing.T) { - var ( - expectedHosts = []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}} - spottedHosts []*url.URL - spottedMu sync.Mutex - ) +type fakeDoer struct { + urls []*url.URL + postf func(u *url.URL) error + mtx sync.Mutex + seen []*url.URL +} - okDo := func(req *http.Request) (response *http.Response, e error) { - spottedMu.Lock() - defer spottedMu.Unlock() +func (f *fakeDoer) Endpoints() []*url.URL { + return f.urls +} - spottedHosts = append(spottedHosts, req.URL) +func (f *fakeDoer) Do(ctx context.Context, u *url.URL, b []byte) error { + f.mtx.Lock() + defer f.mtx.Unlock() + f.seen = append(f.seen, u) + if f.postf == nil { + return nil + } + return f.postf(u) +} - return &http.Response{ - Body: ioutil.NopCloser(bytes.NewBuffer(nil)), - StatusCode: http.StatusOK, - }, nil +func TestSenderSendsOk(t *testing.T) { + poster := &fakeDoer{ + urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, } - s := NewSender(nil, nil, func() []*url.URL { return expectedHosts }, okDo, 10*time.Second) + s := NewSender(nil, nil, []AlertmanagerDoer{poster}) s.Send(context.Background(), []*Alert{{}, {}}) - assertSameHosts(t, expectedHosts, spottedHosts) + assertSameHosts(t, poster.urls, poster.seen) - testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[0].Host)))) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[0].Host)))) + testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[0].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[0].Host)))) - testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[1].Host)))) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[1].Host)))) + testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[1].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[1].Host)))) testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.dropped))) } -func TestSender_Send_OneFails(t *testing.T) { - var ( - expectedHosts = []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}} - spottedHosts []*url.URL - spottedMu sync.Mutex - ) - - do := func(req *http.Request) (response *http.Response, e error) { - spottedMu.Lock() - defer spottedMu.Unlock() - - spottedHosts = append(spottedHosts, req.URL) - - if req.Host == expectedHosts[0].Host { - return nil, errors.New("no such host") - } - return &http.Response{ - Body: ioutil.NopCloser(bytes.NewBuffer(nil)), - StatusCode: http.StatusOK, - }, nil +func TestSenderSendsOneFails(t *testing.T) { + poster := &fakeDoer{ + urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, + postf: func(u *url.URL) error { + if u.Host == "am1:9090" { + return errors.New("no such host") + } + return nil + }, } - s := NewSender(nil, nil, func() []*url.URL { return expectedHosts }, do, 10*time.Second) + s := NewSender(nil, nil, []AlertmanagerDoer{poster}) s.Send(context.Background(), []*Alert{{}, {}}) - assertSameHosts(t, expectedHosts, spottedHosts) + assertSameHosts(t, poster.urls, poster.seen) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[0].Host)))) - testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[0].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[0].Host)))) + testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[0].Host)))) - testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[1].Host)))) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[1].Host)))) + testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[1].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[1].Host)))) testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.dropped))) } -func TestSender_Send_AllFails(t *testing.T) { - var ( - expectedHosts = []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}} - spottedHosts []*url.URL - spottedMu sync.Mutex - ) - - do := func(req *http.Request) (response *http.Response, e error) { - spottedMu.Lock() - defer spottedMu.Unlock() - - spottedHosts = append(spottedHosts, req.URL) - - return nil, errors.New("no such host") +func TestSenderSendsAllFail(t *testing.T) { + poster := &fakeDoer{ + urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, + postf: func(u *url.URL) error { + return errors.New("no such host") + }, } - s := NewSender(nil, nil, func() []*url.URL { return expectedHosts }, do, 10*time.Second) + s := NewSender(nil, nil, []AlertmanagerDoer{poster}) s.Send(context.Background(), []*Alert{{}, {}}) - assertSameHosts(t, expectedHosts, spottedHosts) + assertSameHosts(t, poster.urls, poster.seen) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[0].Host)))) - testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[0].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[0].Host)))) + testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[0].Host)))) - testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(expectedHosts[1].Host)))) - testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(expectedHosts[1].Host)))) + testutil.Equals(t, 0, int(promtestutil.ToFloat64(s.sent.WithLabelValues(poster.urls[1].Host)))) + testutil.Equals(t, 1, int(promtestutil.ToFloat64(s.errs.WithLabelValues(poster.urls[1].Host)))) testutil.Equals(t, 2, int(promtestutil.ToFloat64(s.dropped))) } diff --git a/pkg/alert/client.go b/pkg/alert/client.go new file mode 100644 index 0000000000..dafb5dfdb8 --- /dev/null +++ b/pkg/alert/client.go @@ -0,0 +1,360 @@ +package alert + +import ( + "bytes" + "context" + "fmt" + "net" + "net/http" + "net/url" + "path" + "strconv" + "strings" + "sync" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + config_util "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/common/version" + "github.com/prometheus/prometheus/discovery/file" + "github.com/prometheus/prometheus/discovery/targetgroup" + "gopkg.in/yaml.v2" + + "github.com/thanos-io/thanos/pkg/discovery/cache" + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/runutil" +) + +const ( + defaultAlertmanagerPort = 9093 + alertPushEndpoint = "/api/v1/alerts" + contentTypeJSON = "application/json" +) + +var userAgent = fmt.Sprintf("Thanos/%s", version.Version) + +type AlertingConfig struct { + Alertmanagers []AlertmanagerConfig `yaml:"alertmanagers"` +} + +// AlertmanagerConfig represents a client to a cluster of Alertmanager endpoints. +// TODO(simonpasquier): add support for API version (v1 or v2). +type AlertmanagerConfig struct { + // HTTP client configuration. + HTTPClientConfig HTTPClientConfig `yaml:"http_config"` + + // List of addresses with DNS prefixes. + StaticAddresses []string `yaml:"static_configs"` + // List of file configurations (our FileSD supports different DNS lookups). + FileSDConfigs []FileSDConfig `yaml:"file_sd_configs"` + + // The URL scheme to use when talking to Alertmanagers. + Scheme string `yaml:"scheme"` + + // Path prefix to add in front of the push endpoint path. + PathPrefix string `yaml:"path_prefix"` + + // The timeout used when sending alerts (default: 10s). + Timeout model.Duration `yaml:"timeout"` +} + +type HTTPClientConfig struct { + // The HTTP basic authentication credentials for the targets. + BasicAuth BasicAuth `yaml:"basic_auth"` + // The bearer token for the targets. + BearerToken string `yaml:"bearer_token"` + // The bearer token file for the targets. + BearerTokenFile string `yaml:"bearer_token_file"` + // HTTP proxy server to use to connect to the targets. + ProxyURL string `yaml:"proxy_url"` + // TLSConfig to use to connect to the targets. + TLSConfig TLSConfig `yaml:"tls_config"` +} + +type TLSConfig struct { + // The CA cert to use for the targets. + CAFile string `yaml:"ca_file"` + // The client cert file for the targets. + CertFile string `yaml:"cert_file"` + // The client key file for the targets. + KeyFile string `yaml:"key_file"` + // Used to verify the hostname for the targets. + ServerName string `yaml:"server_name"` + // Disable target certificate validation. + InsecureSkipVerify bool `yaml:"insecure_skip_verify"` +} + +type BasicAuth struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + PasswordFile string `yaml:"password_file"` +} + +func (b BasicAuth) IsZero() bool { + return b.Username == "" && b.Password == "" && b.PasswordFile == "" +} + +func (c HTTPClientConfig) convert() (config_util.HTTPClientConfig, error) { + httpClientConfig := config_util.HTTPClientConfig{ + BearerToken: config_util.Secret(c.BearerToken), + BearerTokenFile: c.BearerTokenFile, + TLSConfig: config_util.TLSConfig{ + CAFile: c.TLSConfig.CAFile, + CertFile: c.TLSConfig.CertFile, + KeyFile: c.TLSConfig.KeyFile, + ServerName: c.TLSConfig.ServerName, + InsecureSkipVerify: c.TLSConfig.InsecureSkipVerify, + }, + } + if c.ProxyURL != "" { + var proxy config_util.URL + err := yaml.Unmarshal([]byte(c.ProxyURL), &proxy) + if err != nil { + return httpClientConfig, err + } + httpClientConfig.ProxyURL = proxy + } + if !c.BasicAuth.IsZero() { + httpClientConfig.BasicAuth = &config_util.BasicAuth{ + Username: c.BasicAuth.Username, + Password: config_util.Secret(c.BasicAuth.Password), + PasswordFile: c.BasicAuth.PasswordFile, + } + } + return httpClientConfig, httpClientConfig.Validate() +} + +type FileSDConfig struct { + Files []string `yaml:"files"` + RefreshInterval model.Duration `yaml:"refresh_interval"` +} + +func (c FileSDConfig) convert() (file.SDConfig, error) { + var fileSDConfig file.SDConfig + b, err := yaml.Marshal(c) + if err != nil { + return fileSDConfig, err + } + err = yaml.Unmarshal(b, &fileSDConfig) + if err != nil { + return fileSDConfig, err + } + return fileSDConfig, nil +} + +func DefaultAlertmanagerConfig() AlertmanagerConfig { + return AlertmanagerConfig{ + Scheme: "http", + Timeout: model.Duration(time.Second * 10), + StaticAddresses: []string{}, + FileSDConfigs: []FileSDConfig{}, + } +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface. +func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + *c = DefaultAlertmanagerConfig() + type plain AlertmanagerConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + return nil +} + +// Alertmanager represents an HTTP client that can send alerts to a cluster of Alertmanager endpoints. +type Alertmanager struct { + logger log.Logger + + client *http.Client + timeout time.Duration + scheme string + prefix string + + staticAddresses []string + fileSDCache *cache.Cache + fileDiscoverers []*file.Discovery + + mtx sync.RWMutex + resolved []string +} + +// NewAlertmanager returns a new Alertmanager client. +func NewAlertmanager(logger log.Logger, cfg AlertmanagerConfig) (*Alertmanager, error) { + httpClientConfig, err := cfg.HTTPClientConfig.convert() + if err != nil { + return nil, err + } + client, err := config_util.NewClientFromConfig(httpClientConfig, "alertmanager", false) + if err != nil { + return nil, err + } + + var discoverers []*file.Discovery + for _, sdCfg := range cfg.FileSDConfigs { + fileSDCfg, err := sdCfg.convert() + if err != nil { + return nil, err + } + discoverers = append(discoverers, file.NewDiscovery(&fileSDCfg, logger)) + } + return &Alertmanager{ + logger: logger, + client: client, + scheme: cfg.Scheme, + prefix: cfg.PathPrefix, + timeout: time.Duration(cfg.Timeout), + staticAddresses: cfg.StaticAddresses, + fileSDCache: cache.New(), + fileDiscoverers: discoverers, + }, nil +} + +// LoadAlertmanagerConfigs loads a list of AlertmanagerConfig from YAML data. +func LoadAlertingConfig(confYaml []byte) (AlertingConfig, error) { + var cfg AlertingConfig + if err := yaml.UnmarshalStrict(confYaml, &cfg); err != nil { + return cfg, err + } + return cfg, nil +} + +// BuildAlertmanagerConfig initializes and returns an Alertmanager client configuration from a static address. +func BuildAlertmanagerConfig(logger log.Logger, address string, timeout time.Duration) (AlertmanagerConfig, error) { + parsed, err := url.Parse(address) + if err != nil { + return AlertmanagerConfig{}, err + } + + scheme := parsed.Scheme + host := parsed.Host + for _, qType := range []dns.QType{dns.A, dns.SRV, dns.SRVNoA} { + prefix := string(qType) + "+" + if strings.HasPrefix(strings.ToLower(scheme), prefix) { + // Scheme is of the form "+". + scheme = strings.TrimPrefix(scheme, prefix) + host = prefix + parsed.Host + break + } + } + var basicAuth BasicAuth + if parsed.User != nil && parsed.User.String() != "" { + basicAuth.Username = parsed.User.Username() + pw, _ := parsed.User.Password() + basicAuth.Password = pw + } + + return AlertmanagerConfig{ + PathPrefix: parsed.Path, + Scheme: scheme, + StaticAddresses: []string{host}, + Timeout: model.Duration(timeout), + HTTPClientConfig: HTTPClientConfig{ + BasicAuth: basicAuth, + }, + }, nil +} + +// Endpoints returns the list of known Alertmanager endpoints. +func (a *Alertmanager) Endpoints() []*url.URL { + a.mtx.RLock() + defer a.mtx.RUnlock() + var urls []*url.URL + for _, addr := range a.resolved { + urls = append(urls, + &url.URL{ + Scheme: a.scheme, + Host: addr, + Path: path.Join("/", a.prefix, alertPushEndpoint), + }, + ) + } + return urls +} + +// Post sends a POST request to the given URL. +func (a *Alertmanager) Do(ctx context.Context, u *url.URL, b []byte) error { + req, err := http.NewRequest("POST", u.String(), bytes.NewReader(b)) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(ctx, a.timeout) + defer cancel() + req = req.WithContext(ctx) + req.Header.Set("Content-Type", contentTypeJSON) + req.Header.Set("User-Agent", userAgent) + + resp, err := a.client.Do(req) + if err != nil { + return errors.Wrapf(err, "send request to %q", u) + } + defer runutil.ExhaustCloseWithLogOnErr(a.logger, resp.Body, "send one alert") + + if resp.StatusCode/100 != 2 { + return errors.Errorf("bad response status %v from %q", resp.Status, u) + } + return nil +} + +// Discover runs the service to discover target endpoints. +func (a *Alertmanager) Discover(ctx context.Context) { + var wg sync.WaitGroup + ch := make(chan []*targetgroup.Group) + + for _, d := range a.fileDiscoverers { + wg.Add(1) + go func(d *file.Discovery) { + d.Run(ctx, ch) + wg.Done() + }(d) + } + + func() { + for { + select { + case update := <-ch: + // Discoverers sometimes send nil updates so need to check for it to avoid panics. + if update == nil { + continue + } + a.fileSDCache.Update(update) + case <-ctx.Done(): + return + } + } + }() + wg.Wait() +} + +// Update refreshes and resolves the list of targets. +func (a *Alertmanager) Update(ctx context.Context, resolver dns.Resolver) error { + var resolved []string + for _, addr := range append(a.fileSDCache.Addresses(), a.staticAddresses...) { + qtypeAndName := strings.SplitN(addr, "+", 2) + if len(qtypeAndName) != 2 { + // No lookup needed. Add to the list and continue to the next address. + resolved = append(resolved, addr) + continue + } + qtype, name := dns.QType(qtypeAndName[0]), qtypeAndName[1] + + // Get only the host and resolve it if needed. + host := name + if qtype == dns.A { + if _, _, err := net.SplitHostPort(host); err != nil { + // The host port could be missing. Append the defaultAlertmanagerPort. + host = host + ":" + strconv.Itoa(defaultAlertmanagerPort) + } + } + addrs, err := resolver.Resolve(ctx, host, qtype) + if err != nil { + return errors.Wrap(err, "failed to resolve alertmanager address") + } + resolved = append(resolved, addrs...) + } + a.mtx.Lock() + a.resolved = resolved + a.mtx.Unlock() + return nil +} diff --git a/pkg/alert/client_test.go b/pkg/alert/client_test.go new file mode 100644 index 0000000000..ba4446bfa0 --- /dev/null +++ b/pkg/alert/client_test.go @@ -0,0 +1,84 @@ +package alert + +import ( + "testing" + "time" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestBuildAlertmanagerConfiguration(t *testing.T) { + for _, tc := range []struct { + address string + + err bool + expected AlertmanagerConfig + }{ + { + address: "http://localhost:9093", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"localhost:9093"}, + Scheme: "http", + }, + }, + { + address: "https://am.example.com", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"am.example.com"}, + Scheme: "https", + }, + }, + { + address: "dns+http://localhost:9093", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"dns+localhost:9093"}, + Scheme: "http", + }, + }, + { + address: "dnssrv+http://localhost", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"dnssrv+localhost"}, + Scheme: "http", + }, + }, + { + address: "ssh+http://localhost", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"localhost"}, + Scheme: "ssh+http", + }, + }, + { + address: "dns+https://localhost/path/prefix/", + expected: AlertmanagerConfig{ + StaticAddresses: []string{"dns+localhost"}, + Scheme: "https", + PathPrefix: "/path/prefix/", + }, + }, + { + address: "http://user:pass@localhost:9093", + expected: AlertmanagerConfig{ + HTTPClientConfig: HTTPClientConfig{ + BasicAuth: BasicAuth{ + Username: "user", + Password: "pass", + }, + }, + StaticAddresses: []string{"localhost:9093"}, + Scheme: "http", + }, + }, + } { + t.Run(tc.address, func(t *testing.T) { + cfg, err := BuildAlertmanagerConfig(nil, tc.address, time.Duration(0)) + if tc.err { + testutil.NotOk(t, err) + return + } + + testutil.Equals(t, tc.expected, cfg) + }) + } +} diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index 9051848a38..2d5cec20e1 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/alert" "github.com/thanos-io/thanos/pkg/objstore/azure" "github.com/thanos-io/thanos/pkg/objstore/client" "github.com/thanos-io/thanos/pkg/objstore/cos" @@ -71,13 +72,20 @@ func main() { os.Exit(1) } } + + alertmgrCfg := alert.DefaultAlertmanagerConfig() + alertmgrCfg.FileSDConfigs = []alert.FileSDConfig{alert.FileSDConfig{}} + if err := generate(alert.AlertingConfig{Alertmanagers: []alert.AlertmanagerConfig{alertmgrCfg}}, "rule_alerting", *outputDir); err != nil { + level.Error(logger).Log("msg", "failed to generate", "type", "rule_alerting", "err", err) + os.Exit(1) + } logger.Log("msg", "success") } func generate(obj interface{}, typ string, outputDir string) error { // We forbid omitempty option. This is for simplification for doc generation. if err := checkForOmitEmptyTagOption(obj); err != nil { - return err + return errors.Wrap(err, "invalid type") } out, err := yaml.Marshal(obj) @@ -95,15 +103,15 @@ func checkForOmitEmptyTagOption(obj interface{}) error { func checkForOmitEmptyTagOptionRec(v reflect.Value) error { switch v.Kind() { case reflect.Struct: - for i := 0; i < v.NumField(); i += 1 { + for i := 0; i < v.NumField(); i++ { tags, err := structtag.Parse(string(v.Type().Field(i).Tag)) if err != nil { - return err + return errors.Wrapf(err, "%s: failed to parse tag %q", v.Type().Field(i).Name, v.Type().Field(i).Tag) } tag, err := tags.Get("yaml") if err != nil { - return err + return errors.Wrapf(err, "%s: failed to get tag %q", v.Type().Field(i).Name, v.Type().Field(i).Tag) } for _, opts := range tag.Options { @@ -113,16 +121,12 @@ func checkForOmitEmptyTagOptionRec(v reflect.Value) error { } if err := checkForOmitEmptyTagOptionRec(v.Field(i)); err != nil { - return err + return errors.Wrapf(err, "%s", v.Type().Field(i).Name) } } case reflect.Ptr: - if !v.IsValid() { - return errors.New("nil pointers are not allowed in configuration.") - } - - return errors.New("nil pointers are not allowed in configuration.") + return errors.New("nil pointers are not allowed in configuration") case reflect.Interface: return checkForOmitEmptyTagOptionRec(v.Elem()) From 5673d7466a9846d47734ad2aa55a0d17420e5bd4 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 9 Dec 2019 12:14:51 +0100 Subject: [PATCH 02/10] Fail hard when --alertmanagers.url and --alertmanagers.config flags are both defined Signed-off-by: Simon Pasquier --- cmd/thanos/rule.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 6b451e72b2..03fd85b416 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -297,7 +297,7 @@ func runRule( ) if len(alertmgrsConfigYAML) > 0 { if len(alertmgrURLs) != 0 { - level.Warn(logger).Log("msg", "ignoring --alertmanagers.url flag because --alertmanagers.config* flag is also defined") + return errors.New("--alertmanagers.url and --alertmanagers.config* flags cannot be defined at the same time") } alertingcfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML) if err != nil { From a245cc22090af5ceaf84aa1ebd4c033b5a34cbd4 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 9 Dec 2019 12:20:05 +0100 Subject: [PATCH 03/10] Update CHANGELOG.md Signed-off-by: Simon Pasquier --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c3249140ed..4c66b9a544 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1852](https://github.com/thanos-io/thanos/pull/1852) Add support for `AWS_CONTAINER_CREDENTIALS_FULL_URI` by upgrading to minio-go v6.0.44 - [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering. +### Added + +- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information. + ## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03 ### Added From 282681efa73bf4fcc65b1ffcddbe3b99d68d2d22 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 9 Dec 2019 13:05:40 +0100 Subject: [PATCH 04/10] Move tests from cmd/thanos to pkg/alert Signed-off-by: Simon Pasquier --- cmd/thanos/rule_test.go | 98 --------------------------------------- pkg/alert/client_test.go | 99 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 98 deletions(-) diff --git a/cmd/thanos/rule_test.go b/cmd/thanos/rule_test.go index f4d801d747..2abd38cf6a 100644 --- a/cmd/thanos/rule_test.go +++ b/cmd/thanos/rule_test.go @@ -1,12 +1,8 @@ package main import ( - "context" - "net/url" "testing" - "github.com/pkg/errors" - "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -49,97 +45,3 @@ func Test_parseFlagLabels(t *testing.T) { testutil.Equals(t, err != nil, td.expectErr) } } - -func TestRule_AlertmanagerResolveWithoutPort(t *testing.T) { - mockResolver := mockResolver{ - resultIPs: map[string][]string{ - "alertmanager.com:9093": {"1.1.1.1:9300"}, - }, - } - am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com"}} - - ctx := context.TODO() - err := am.update(ctx) - testutil.Ok(t, err) - - expected := []*url.URL{ - { - Scheme: "http", - Host: "1.1.1.1:9300", - }, - } - gotURLs := am.get() - testutil.Equals(t, expected, gotURLs) -} - -func TestRule_AlertmanagerResolveWithPort(t *testing.T) { - mockResolver := mockResolver{ - resultIPs: map[string][]string{ - "alertmanager.com:19093": {"1.1.1.1:9300"}, - }, - } - am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com:19093"}} - - ctx := context.TODO() - err := am.update(ctx) - testutil.Ok(t, err) - - expected := []*url.URL{ - { - Scheme: "http", - Host: "1.1.1.1:9300", - }, - } - gotURLs := am.get() - testutil.Equals(t, expected, gotURLs) -} - -type mockResolver struct { - resultIPs map[string][]string - err error -} - -func (m mockResolver) Resolve(ctx context.Context, name string, qtype dns.QType) ([]string, error) { - if m.err != nil { - return nil, m.err - } - if res, ok := m.resultIPs[name]; ok { - return res, nil - } - return nil, errors.Errorf("mockResolver not found response for name: %s", name) -} - -func Test_ParseAlertmanagerAddress(t *testing.T) { - var tData = []struct { - address string - expectQueryType dns.QType - expectUrl *url.URL - expectError error - }{ - { - address: "http://user:pass+word@foo.bar:3289", - expectQueryType: dns.QType(""), - expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http", User: url.UserPassword("user", "pass+word")}, - expectError: nil, - }, - { - address: "dnssrvnoa+http://user:pass+word@foo.bar:3289", - expectQueryType: dns.QType("dnssrvnoa"), - expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http", User: url.UserPassword("user", "pass+word")}, - expectError: nil, - }, - { - address: "foo+bar+http://foo.bar:3289", - expectQueryType: dns.QType("foo+bar"), - expectUrl: &url.URL{Host: "foo.bar:3289", Scheme: "http"}, - expectError: nil, - }, - } - - for _, d := range tData { - q, u, e := parseAlertmanagerAddress(d.address) - testutil.Equals(t, d.expectError, e) - testutil.Equals(t, d.expectUrl, u) - testutil.Equals(t, d.expectQueryType, q) - } -} diff --git a/pkg/alert/client_test.go b/pkg/alert/client_test.go index ba4446bfa0..060e218e05 100644 --- a/pkg/alert/client_test.go +++ b/pkg/alert/client_test.go @@ -1,9 +1,14 @@ package alert import ( + "context" + "strings" "testing" "time" + "github.com/pkg/errors" + + "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -82,3 +87,97 @@ func TestBuildAlertmanagerConfiguration(t *testing.T) { }) } } + +type mockEntry struct { + name string + qtype dns.QType +} + +type mockResolver struct { + entries map[mockEntry][]string + err error +} + +func (m mockResolver) Resolve(ctx context.Context, name string, qtype dns.QType) ([]string, error) { + if m.err != nil { + return nil, m.err + } + if res, ok := m.entries[mockEntry{name: name, qtype: qtype}]; ok { + return res, nil + } + return nil, errors.Errorf("mockResolver not found response for name: %s", name) +} + +func TestUpdate(t *testing.T) { + for _, tc := range []struct { + cfg AlertmanagerConfig + resolver mockResolver + + resolved []string + err bool + }{ + { + cfg: AlertmanagerConfig{ + StaticAddresses: []string{"dns+alertmanager.example.com:9095"}, + }, + resolver: mockResolver{ + entries: map[mockEntry][]string{ + mockEntry{name: "alertmanager.example.com:9095", qtype: dns.A}: []string{"1.1.1.1:9095", "2.2.2.2:9095"}, + }, + }, + resolved: []string{"1.1.1.1:9095", "2.2.2.2:9095"}, + }, + { + cfg: AlertmanagerConfig{ + StaticAddresses: []string{"dns+alertmanager.example.com"}, + }, + resolver: mockResolver{ + entries: map[mockEntry][]string{ + mockEntry{name: "alertmanager.example.com:9093", qtype: dns.A}: []string{"1.1.1.1:9093", "2.2.2.2:9093"}, + }, + }, + resolved: []string{"1.1.1.1:9093", "2.2.2.2:9093"}, + }, + { + cfg: AlertmanagerConfig{ + StaticAddresses: []string{"alertmanager.example.com:9096"}, + }, + resolved: []string{"alertmanager.example.com:9096"}, + }, + { + cfg: AlertmanagerConfig{ + StaticAddresses: []string{"dnssrv+_web._tcp.alertmanager.example.com"}, + }, + resolver: mockResolver{ + entries: map[mockEntry][]string{ + mockEntry{name: "_web._tcp.alertmanager.example.com", qtype: dns.SRV}: []string{"1.1.1.1:9097", "2.2.2.2:9097"}, + }, + }, + resolved: []string{"1.1.1.1:9097", "2.2.2.2:9097"}, + }, + { + cfg: AlertmanagerConfig{ + StaticAddresses: []string{"dnssrv+_web._tcp.notfound.example.com"}, + }, + resolver: mockResolver{ + entries: map[mockEntry][]string{}, + }, + err: true, + }, + } { + t.Run(strings.Join(tc.cfg.StaticAddresses, ","), func(t *testing.T) { + am, err := NewAlertmanager(nil, tc.cfg) + testutil.Ok(t, err) + ctx := context.Background() + err = am.Update(ctx, &tc.resolver) + if tc.err { + t.Logf("%v", err) + testutil.NotOk(t, err) + return + } + + testutil.Equals(t, tc.resolved, am.resolved) + }) + } + +} From 16f18581201ca7f1db6eae4ea13eedbfd567112d Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Tue, 10 Dec 2019 15:24:23 +0100 Subject: [PATCH 05/10] Add end-to-end for Alertmanager file SD Signed-off-by: Simon Pasquier --- cmd/thanos/rule.go | 6 +- docs/components/rule.md | 3 + pkg/alert/client.go | 5 +- test/e2e/rule_test.go | 167 ++++++++++++++++++++++++++++++++++++++-- test/e2e/spinup_test.go | 6 +- 5 files changed, 176 insertions(+), 11 deletions(-) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 03fd85b416..0b756e5f25 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -83,6 +83,8 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { Strings() alertmgrsTimeout := cmd.Flag("alertmanagers.send-timeout", "Timeout for sending alerts to Alertmanager").Default("10s").Duration() alertmgrsConfig := extflag.RegisterPathOrContent(cmd, "alertmanagers.config", "YAML file that contains alerting configuration. See format details: https://thanos.io/components/rule.md/#configuration. If defined, it takes precedence over the '--alertmanagers.url' and '--alertmanagers.send-timeout' flags.", false) + alertmgrsDNSSDInterval := modelDuration(cmd.Flag("alertmanagers.sd-dns-interval", "Interval between DNS resolutions of Alertmanager hosts."). + Default("30s")) alertQueryURL := cmd.Flag("alert.query-url", "The external Thanos Query URL that would be set in all alerts 'Source' field").String() @@ -156,6 +158,7 @@ func registerRule(m map[string]setupFunc, app *kingpin.Application) { *alertmgrs, *alertmgrsTimeout, alertmgrsConfig, + time.Duration(*alertmgrsDNSSDInterval), *grpcBindAddr, time.Duration(*grpcGracePeriod), *grpcCert, @@ -194,6 +197,7 @@ func runRule( alertmgrURLs []string, alertmgrsTimeout time.Duration, alertmgrsConfig *extflag.PathOrContent, + alertmgrsDNSSDInterval time.Duration, grpcBindAddr string, grpcGracePeriod time.Duration, grpcCert string, @@ -403,7 +407,7 @@ func runRule( }) g.Add(func() error { - return runutil.Repeat(30*time.Second, ctx.Done(), func() error { + return runutil.Repeat(alertmgrsDNSSDInterval, ctx.Done(), func() error { if err := am.Update(ctx, resolver); err != nil { level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err) alertMngrAddrResolutionErrors.Inc() diff --git a/docs/components/rule.md b/docs/components/rule.md index 7d6f46586a..565223681f 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -226,6 +226,9 @@ Flags: If defined, it takes precedence over the '--alertmanagers.url' and '--alertmanagers.send-timeout' flags. + --alertmanagers.sd-dns-interval=30s + Interval between DNS resolutions of + Alertmanager hosts. --alert.query-url=ALERT.QUERY-URL The external Thanos Query URL that would be set in all alerts 'Source' field diff --git a/pkg/alert/client.go b/pkg/alert/client.go index dafb5dfdb8..022fdf1749 100644 --- a/pkg/alert/client.go +++ b/pkg/alert/client.go @@ -14,6 +14,7 @@ import ( "time" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/pkg/errors" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -331,9 +332,10 @@ func (a *Alertmanager) Discover(ctx context.Context) { func (a *Alertmanager) Update(ctx context.Context, resolver dns.Resolver) error { var resolved []string for _, addr := range append(a.fileSDCache.Addresses(), a.staticAddresses...) { + level.Debug(a.logger).Log("msg", "resolving address", "addr", addr) qtypeAndName := strings.SplitN(addr, "+", 2) if len(qtypeAndName) != 2 { - // No lookup needed. Add to the list and continue to the next address. + level.Debug(a.logger).Log("msg", "no lookup needed", "addr", addr) resolved = append(resolved, addr) continue } @@ -351,6 +353,7 @@ func (a *Alertmanager) Update(ctx context.Context, resolver dns.Resolver) error if err != nil { return errors.Wrap(err, "failed to resolve alertmanager address") } + level.Debug(a.logger).Log("msg", "address resolved", "addr", addr, "resolved", strings.Join(addrs, ",")) resolved = append(resolved, addrs...) } a.mtx.Lock() diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 5784938aa8..3c88274f0b 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -15,8 +15,12 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/discovery/targetgroup" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/timestamp" + yaml "gopkg.in/yaml.v2" + + "github.com/thanos-io/thanos/pkg/alert" "github.com/thanos-io/thanos/pkg/promclient" rapi "github.com/thanos-io/thanos/pkg/rule/api" "github.com/thanos-io/thanos/pkg/runutil" @@ -62,10 +66,153 @@ func createRuleFiles(t *testing.T, dir string) { } } +func serializeAlertingConfiguration(t *testing.T, cfg alert.AlertmanagerConfig) []byte { + t.Helper() + amCfg := alert.AlertingConfig{ + Alertmanagers: []alert.AlertmanagerConfig{cfg}, + } + b, err := yaml.Marshal(&amCfg) + if err != nil { + t.Errorf("failed to serialize alerting configuration: %v", err) + } + return b +} + +func writeAlertmanagerFileSD(t *testing.T, path string, addrs ...string) { + group := targetgroup.Group{Targets: []model.LabelSet{}} + for _, addr := range addrs { + group.Targets = append(group.Targets, model.LabelSet{model.LabelName(model.AddressLabel): model.LabelValue(addr)}) + } + + b, err := yaml.Marshal([]*targetgroup.Group{&group}) + if err != nil { + t.Errorf("failed to serialize file SD configuration: %v", err) + return + } + + err = ioutil.WriteFile(path+".tmp", b, 0660) + if err != nil { + t.Errorf("failed to write file SD configuration: %v", err) + return + } + + err = os.Rename(path+".tmp", path) + testutil.Ok(t, err) +} + +func TestRuleAlertmanagerFileSD(t *testing.T) { + a := newLocalAddresser() + + am := alertManager(a.New()) + amDir, err := ioutil.TempDir("", "am") + defer os.RemoveAll(amDir) + testutil.Ok(t, err) + amCfg := serializeAlertingConfiguration( + t, + alert.AlertmanagerConfig{ + FileSDConfigs: []alert.FileSDConfig{ + alert.FileSDConfig{ + Files: []string{filepath.Join(amDir, "*.yaml")}, + RefreshInterval: model.Duration(time.Hour), + }, + }, + Scheme: "http", + Timeout: model.Duration(time.Second), + }, + ) + + rulesDir, err := ioutil.TempDir("", "rules") + defer os.RemoveAll(rulesDir) + testutil.Ok(t, err) + createRuleFiles(t, rulesDir) + + qAddr := a.New() + r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil) + q := querier(qAddr, a.New(), []address{r.GRPC}, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + exit, err := e2eSpinup(t, ctx, am, q, r) + if err != nil { + t.Errorf("spinup failed: %v", err) + cancel() + return + } + + defer func() { + cancel() + <-exit + }() + + // Wait for a couple of evaluations. + testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { + select { + case <-exit: + cancel() + return nil + default: + } + + // The time series written for the firing alerting rule must be queryable. + res, warnings, err := promclient.QueryInstant(ctx, nil, urlParse(t, q.HTTP.URL()), "max(count_over_time(ALERTS[1m])) > 2", time.Now(), promclient.QueryOptions{ + Deduplicate: false, + }) + if err != nil { + return err + } + if len(warnings) > 0 { + return errors.Errorf("unexpected warnings %s", warnings) + } + if len(res) == 0 { + return errors.Errorf("empty result") + } + + alrts, err := queryAlertmanagerAlerts(ctx, am.HTTP.URL()) + if err != nil { + return err + } + if len(alrts) != 0 { + return errors.Errorf("unexpected alerts length %d", len(alrts)) + } + + return nil + })) + + // Update the Alertmanager file service discovery configuration. + writeAlertmanagerFileSD(t, filepath.Join(amDir, "targets.yaml"), am.HTTP.HostPort()) + + // Verify that alerts are received by Alertmanager. + testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { + select { + case <-exit: + cancel() + return nil + default: + } + alrts, err := queryAlertmanagerAlerts(ctx, am.HTTP.URL()) + if err != nil { + return err + } + if len(alrts) == 0 { + return errors.Errorf("expecting alerts") + } + + return nil + })) +} + func TestRule(t *testing.T) { a := newLocalAddresser() am := alertManager(a.New()) + amCfg := serializeAlertingConfiguration( + t, + alert.AlertmanagerConfig{ + StaticAddresses: []string{am.HTTP.HostPort()}, + Scheme: "http", + Timeout: model.Duration(time.Second), + }, + ) + qAddr := a.New() rulesDir, err := ioutil.TempDir("", "rules") @@ -73,8 +220,8 @@ func TestRule(t *testing.T) { testutil.Ok(t, err) createRuleFiles(t, rulesDir) - r1 := rule(a.New(), a.New(), rulesDir, am.HTTP, []address{qAddr}, nil) - r2 := rule(a.New(), a.New(), rulesDir, am.HTTP, nil, []address{qAddr}) + r1 := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil) + r2 := rule(a.New(), a.New(), rulesDir, amCfg, nil, []address{qAddr}) q := querier(qAddr, a.New(), []address{r1.GRPC, r2.GRPC}, nil) @@ -282,19 +429,25 @@ func (a *failingStoreAPI) LabelValues(context.Context, *storepb.LabelValuesReque // Test Ruler behaviour on different storepb.PartialResponseStrategy when having partial response from single `failingStoreAPI`. func TestRulePartialResponse(t *testing.T) { - dir, err := ioutil.TempDir("", "test_rulepartial_response") - testutil.Ok(t, err) - defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - a := newLocalAddresser() qAddr := a.New() f := fakeStoreAPI(a.New(), &failingStoreAPI{}) am := alertManager(a.New()) + amCfg := serializeAlertingConfiguration( + t, + alert.AlertmanagerConfig{ + StaticAddresses: []string{am.HTTP.HostPort()}, + Scheme: "http", + Timeout: model.Duration(time.Second), + }, + ) + rulesDir, err := ioutil.TempDir("", "rules") defer os.RemoveAll(rulesDir) testutil.Ok(t, err) - r := rule(a.New(), a.New(), rulesDir, am.HTTP, []address{qAddr}, nil) + + r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil) q := querier(qAddr, a.New(), []address{r.GRPC, f.GRPC}, nil) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 4eea79e5fc..721eb70403 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -305,7 +305,7 @@ receivers: } } -func rule(http, grpc address, ruleDir string, am address, queryAddresses []address, queryFileSDAddresses []address) *serverScheduler { +func rule(http, grpc address, ruleDir string, amCfg []byte, queryAddresses []address, queryFileSDAddresses []address) *serverScheduler { return &serverScheduler{ HTTP: http, GRPC: grpc, @@ -317,12 +317,14 @@ func rule(http, grpc address, ruleDir string, am address, queryAddresses []addre "--data-dir", filepath.Join(workDir, "data"), "--rule-file", filepath.Join(ruleDir, "*.yaml"), "--eval-interval", "1s", - "--alertmanagers.url", am.URL(), + "--alertmanagers.config", string(amCfg), + "--alertmanagers.sd-dns-interval", "5s", "--grpc-address", grpc.HostPort(), "--grpc-grace-period", "0s", "--http-address", http.HostPort(), "--log.level", "debug", "--query.sd-dns-interval", "5s", + "--resend-delay", "5s", } for _, addr := range queryAddresses { From 351841d08344bd0b0df4e673d888198f94eaf754 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Wed, 11 Dec 2019 08:54:09 +0100 Subject: [PATCH 06/10] test/e2e: add test with different alerting HTTP clients Signed-off-by: Simon Pasquier --- test/e2e/rule_test.go | 167 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 165 insertions(+), 2 deletions(-) diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 3c88274f0b..de28a5aa9a 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -1,15 +1,19 @@ package e2e_test import ( + "bytes" "context" "encoding/json" + "encoding/pem" "fmt" "io/ioutil" "math" "net/http" + "net/http/httptest" "os" "path/filepath" "sort" + "sync" "testing" "time" @@ -66,10 +70,10 @@ func createRuleFiles(t *testing.T, dir string) { } } -func serializeAlertingConfiguration(t *testing.T, cfg alert.AlertmanagerConfig) []byte { +func serializeAlertingConfiguration(t *testing.T, cfg ...alert.AlertmanagerConfig) []byte { t.Helper() amCfg := alert.AlertingConfig{ - Alertmanagers: []alert.AlertmanagerConfig{cfg}, + Alertmanagers: cfg, } b, err := yaml.Marshal(&amCfg) if err != nil { @@ -100,6 +104,165 @@ func writeAlertmanagerFileSD(t *testing.T, path string, addrs ...string) { testutil.Ok(t, err) } +type mockAlertmanager struct { + path string + token string + mtx sync.Mutex + alerts []*model.Alert + lastError error +} + +func newMockAlertmanager(path string, token string) *mockAlertmanager { + return &mockAlertmanager{ + path: path, + token: token, + alerts: make([]*model.Alert, 0), + } +} + +func (m *mockAlertmanager) setLastError(err error) { + m.mtx.Lock() + defer m.mtx.Unlock() + m.lastError = err +} + +func (m *mockAlertmanager) LastError() error { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.lastError +} + +func (m *mockAlertmanager) Alerts() []*model.Alert { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.alerts +} + +func (m *mockAlertmanager) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + if req.Method != "POST" { + m.setLastError(errors.Errorf("invalid method: %s", req.Method)) + resp.WriteHeader(http.StatusMethodNotAllowed) + return + } + + if req.URL.Path != m.path { + m.setLastError(errors.Errorf("invalid path: %s", req.URL.Path)) + resp.WriteHeader(http.StatusNotFound) + return + } + + if m.token != "" { + auth := req.Header.Get("Authorization") + if auth != fmt.Sprintf("Bearer %s", m.token) { + m.setLastError(errors.Errorf("invalid auth: %s", req.URL.Path)) + resp.WriteHeader(http.StatusForbidden) + return + } + } + + b, err := ioutil.ReadAll(req.Body) + if err != nil { + m.setLastError(err) + resp.WriteHeader(http.StatusInternalServerError) + return + } + + var alerts []*model.Alert + if err := json.Unmarshal(b, &alerts); err != nil { + m.setLastError(err) + resp.WriteHeader(http.StatusInternalServerError) + return + } + + m.mtx.Lock() + m.alerts = append(m.alerts, alerts...) + m.mtx.Unlock() +} + +func TestRuleAlertmanagerHTTPClient(t *testing.T) { + a := newLocalAddresser() + + // Plain HTTP with a prefix. + handler1 := newMockAlertmanager("/prefix/api/v1/alerts", "") + srv1 := httptest.NewServer(handler1) + defer srv1.Close() + // HTTPS with authentication. + handler2 := newMockAlertmanager("/api/v1/alerts", "secret") + srv2 := httptest.NewTLSServer(handler2) + defer srv2.Close() + + // Write the server's certificate to disk for the alerting configuration. + tlsDir, err := ioutil.TempDir("", "tls") + defer os.RemoveAll(tlsDir) + testutil.Ok(t, err) + var out bytes.Buffer + err = pem.Encode(&out, &pem.Block{Type: "CERTIFICATE", Bytes: srv2.TLS.Certificates[0].Certificate[0]}) + testutil.Ok(t, err) + caFile := filepath.Join(tlsDir, "ca.crt") + err = ioutil.WriteFile(caFile, out.Bytes(), 0640) + testutil.Ok(t, err) + + amCfg := serializeAlertingConfiguration( + t, + alert.AlertmanagerConfig{ + StaticAddresses: []string{srv1.Listener.Addr().String()}, + Scheme: "http", + Timeout: model.Duration(time.Second), + PathPrefix: "/prefix/", + }, + alert.AlertmanagerConfig{ + HTTPClientConfig: alert.HTTPClientConfig{ + TLSConfig: alert.TLSConfig{ + CAFile: caFile, + }, + BearerToken: "secret", + }, + StaticAddresses: []string{srv2.Listener.Addr().String()}, + Scheme: "https", + Timeout: model.Duration(time.Second), + }, + ) + + rulesDir, err := ioutil.TempDir("", "rules") + defer os.RemoveAll(rulesDir) + testutil.Ok(t, err) + createRuleFiles(t, rulesDir) + + qAddr := a.New() + r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil) + q := querier(qAddr, a.New(), []address{r.GRPC}, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + exit, err := e2eSpinup(t, ctx, q, r) + if err != nil { + t.Errorf("spinup failed: %v", err) + cancel() + return + } + + defer func() { + cancel() + <-exit + }() + + testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { + select { + case <-exit: + cancel() + return nil + default: + } + + for i, am := range []*mockAlertmanager{handler1, handler2} { + if len(am.Alerts()) == 0 { + return errors.Errorf("no alert received from handler%d, last error: %v", i, am.LastError()) + } + } + + return nil + })) +} + func TestRuleAlertmanagerFileSD(t *testing.T) { a := newLocalAddresser() From 075283cea73ca550044d754c6b167b041ea02fc5 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Wed, 11 Dec 2019 09:45:48 +0100 Subject: [PATCH 07/10] Fix panic in pkg/alert/client_test.go Signed-off-by: Simon Pasquier --- pkg/alert/client.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/alert/client.go b/pkg/alert/client.go index 022fdf1749..fbd216a24f 100644 --- a/pkg/alert/client.go +++ b/pkg/alert/client.go @@ -183,6 +183,10 @@ type Alertmanager struct { // NewAlertmanager returns a new Alertmanager client. func NewAlertmanager(logger log.Logger, cfg AlertmanagerConfig) (*Alertmanager, error) { + if logger == nil { + logger = log.NewNopLogger() + } + httpClientConfig, err := cfg.HTTPClientConfig.convert() if err != nil { return nil, err From 65f00def54a3e9fa79bfe90753622bb16b2e70cb Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Mon, 16 Dec 2019 15:55:58 +0100 Subject: [PATCH 08/10] Address Bartek's comments Signed-off-by: Simon Pasquier --- CHANGELOG.md | 5 ++--- cmd/thanos/rule.go | 16 +++++++------- docs/components/rule.md | 4 +++- pkg/alert/alert.go | 19 ++++++++++------- pkg/alert/alert_test.go | 19 +++++++++-------- pkg/alert/client.go | 11 ++++------ pkg/alert/client_test.go | 4 ++++ test/e2e/rule_test.go | 45 ++++++++++++++++------------------------ 8 files changed, 60 insertions(+), 63 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4c66b9a544..35cdc6f7fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,11 +18,10 @@ We use *breaking* word for marking changes that are not backward compatible (rel ### Added - [#1852](https://github.com/thanos-io/thanos/pull/1852) Add support for `AWS_CONTAINER_CREDENTIALS_FULL_URI` by upgrading to minio-go v6.0.44 - [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering. - -### Added - - [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information. +- [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts. + ## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03 ### Added diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 0b756e5f25..2a0e9e9969 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -296,14 +296,14 @@ func runRule( return err } var ( - alertingcfg alert.AlertingConfig + alertingCfg alert.AlertingConfig alertmgrs []*alert.Alertmanager ) if len(alertmgrsConfigYAML) > 0 { if len(alertmgrURLs) != 0 { return errors.New("--alertmanagers.url and --alertmanagers.config* flags cannot be defined at the same time") } - alertingcfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML) + alertingCfg, err = alert.LoadAlertingConfig(alertmgrsConfigYAML) if err != nil { return err } @@ -314,13 +314,13 @@ func runRule( if err != nil { return err } - alertingcfg.Alertmanagers = append(alertingcfg.Alertmanagers, cfg) + alertingCfg.Alertmanagers = append(alertingCfg.Alertmanagers, cfg) } } - if len(alertingcfg.Alertmanagers) == 0 { + if len(alertingCfg.Alertmanagers) == 0 { level.Warn(logger).Log("msg", "no alertmanager configured") } - for _, cfg := range alertingcfg.Alertmanagers { + for _, cfg := range alertingCfg.Alertmanagers { am, err := alert.NewAlertmanager(logger, cfg) if err != nil { return err @@ -421,11 +421,11 @@ func runRule( } // Run the alert sender. { - doers := make([]alert.AlertmanagerDoer, len(alertmgrs)) + clients := make([]alert.AlertmanagerClient, len(alertmgrs)) for i := range alertmgrs { - doers[i] = alertmgrs[i] + clients[i] = alertmgrs[i] } - sdr := alert.NewSender(logger, reg, doers) + sdr := alert.NewSender(logger, reg, clients) ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { diff --git a/docs/components/rule.md b/docs/components/rule.md index 565223681f..7f8807a906 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -290,7 +290,9 @@ Flags: ### Alertmanager -The configuration format supported by the `--alertmanagers.config` and `--alertmanagers.config-file` flags is the following: +The `--alertmanagers.config` and `--alertmanagers.config-file` flags allow specifying multiple Alertmanagers. Those entries are treated as a single HA group. This means that alert send failure is claimed only if the Ruler fails to send to all instances. + +The configuration format is the following: [embedmd]:# (../flags/config_rule_alerting.txt yaml) ```yaml diff --git a/pkg/alert/alert.go b/pkg/alert/alert.go index c443f71555..2a9e2e8ab8 100644 --- a/pkg/alert/alert.go +++ b/pkg/alert/alert.go @@ -2,9 +2,11 @@ package alert import ( + "bytes" "context" "encoding/json" "fmt" + "io" "net/url" "sync" "sync/atomic" @@ -238,15 +240,15 @@ func (q *Queue) Push(alerts []*Alert) { } } -type AlertmanagerDoer interface { +type AlertmanagerClient interface { Endpoints() []*url.URL - Do(context.Context, *url.URL, []byte) error + Do(context.Context, *url.URL, io.Reader) error } // Sender sends notifications to a dynamic set of alertmanagers. type Sender struct { logger log.Logger - alertmanagers []AlertmanagerDoer + alertmanagers []AlertmanagerClient sent *prometheus.CounterVec errs *prometheus.CounterVec @@ -259,7 +261,7 @@ type Sender struct { func NewSender( logger log.Logger, reg prometheus.Registerer, - alertmanagers []AlertmanagerDoer, + alertmanagers []AlertmanagerClient, ) *Sender { if logger == nil { logger = log.NewNopLogger() @@ -294,7 +296,7 @@ func NewSender( return s } -// Send an alert batch to all given Alertmanager client. +// Send an alert batch to all given Alertmanager clients. // TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660. func (s *Sender) Send(ctx context.Context, alerts []*Alert) { if len(alerts) == 0 { @@ -313,17 +315,18 @@ func (s *Sender) Send(ctx context.Context, alerts []*Alert) { for _, amc := range s.alertmanagers { for _, u := range amc.Endpoints() { wg.Add(1) - go func(amc AlertmanagerDoer, u *url.URL) { + go func(amc AlertmanagerClient, u *url.URL) { defer wg.Done() level.Debug(s.logger).Log("msg", "sending alerts", "alertmanager", u.Host, "numAlerts", len(alerts)) start := time.Now() - if err := amc.Do(ctx, u, b); err != nil { + if err := amc.Do(ctx, u, bytes.NewReader(b)); err != nil { level.Warn(s.logger).Log( "msg", "sending alerts failed", "alertmanager", u.Host, "numAlerts", len(alerts), - "err", err) + "err", err, + ) s.errs.WithLabelValues(u.Host).Inc() return } diff --git a/pkg/alert/alert_test.go b/pkg/alert/alert_test.go index 8797d9f8f2..7e1e851d80 100644 --- a/pkg/alert/alert_test.go +++ b/pkg/alert/alert_test.go @@ -2,6 +2,7 @@ package alert import ( "context" + "io" "net/url" "sync" "testing" @@ -46,18 +47,18 @@ func assertSameHosts(t *testing.T, expected []*url.URL, found []*url.URL) { } } -type fakeDoer struct { +type fakeClient struct { urls []*url.URL postf func(u *url.URL) error mtx sync.Mutex seen []*url.URL } -func (f *fakeDoer) Endpoints() []*url.URL { +func (f *fakeClient) Endpoints() []*url.URL { return f.urls } -func (f *fakeDoer) Do(ctx context.Context, u *url.URL, b []byte) error { +func (f *fakeClient) Do(ctx context.Context, u *url.URL, r io.Reader) error { f.mtx.Lock() defer f.mtx.Unlock() f.seen = append(f.seen, u) @@ -68,10 +69,10 @@ func (f *fakeDoer) Do(ctx context.Context, u *url.URL, b []byte) error { } func TestSenderSendsOk(t *testing.T) { - poster := &fakeDoer{ + poster := &fakeClient{ urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, } - s := NewSender(nil, nil, []AlertmanagerDoer{poster}) + s := NewSender(nil, nil, []AlertmanagerClient{poster}) s.Send(context.Background(), []*Alert{{}, {}}) @@ -86,7 +87,7 @@ func TestSenderSendsOk(t *testing.T) { } func TestSenderSendsOneFails(t *testing.T) { - poster := &fakeDoer{ + poster := &fakeClient{ urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, postf: func(u *url.URL) error { if u.Host == "am1:9090" { @@ -95,7 +96,7 @@ func TestSenderSendsOneFails(t *testing.T) { return nil }, } - s := NewSender(nil, nil, []AlertmanagerDoer{poster}) + s := NewSender(nil, nil, []AlertmanagerClient{poster}) s.Send(context.Background(), []*Alert{{}, {}}) @@ -110,13 +111,13 @@ func TestSenderSendsOneFails(t *testing.T) { } func TestSenderSendsAllFail(t *testing.T) { - poster := &fakeDoer{ + poster := &fakeClient{ urls: []*url.URL{{Host: "am1:9090"}, {Host: "am2:9090"}}, postf: func(u *url.URL) error { return errors.New("no such host") }, } - s := NewSender(nil, nil, []AlertmanagerDoer{poster}) + s := NewSender(nil, nil, []AlertmanagerClient{poster}) s.Send(context.Background(), []*Alert{{}, {}}) diff --git a/pkg/alert/client.go b/pkg/alert/client.go index fbd216a24f..35694b2ed5 100644 --- a/pkg/alert/client.go +++ b/pkg/alert/client.go @@ -1,9 +1,9 @@ package alert import ( - "bytes" "context" "fmt" + "io" "net" "net/http" "net/url" @@ -158,10 +158,7 @@ func DefaultAlertmanagerConfig() AlertmanagerConfig { func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { *c = DefaultAlertmanagerConfig() type plain AlertmanagerConfig - if err := unmarshal((*plain)(c)); err != nil { - return err - } - return nil + return unmarshal((*plain)(c)) } // Alertmanager represents an HTTP client that can send alerts to a cluster of Alertmanager endpoints. @@ -279,8 +276,8 @@ func (a *Alertmanager) Endpoints() []*url.URL { } // Post sends a POST request to the given URL. -func (a *Alertmanager) Do(ctx context.Context, u *url.URL, b []byte) error { - req, err := http.NewRequest("POST", u.String(), bytes.NewReader(b)) +func (a *Alertmanager) Do(ctx context.Context, u *url.URL, r io.Reader) error { + req, err := http.NewRequest("POST", u.String(), r) if err != nil { return err } diff --git a/pkg/alert/client_test.go b/pkg/alert/client_test.go index 060e218e05..3fd94c9404 100644 --- a/pkg/alert/client_test.go +++ b/pkg/alert/client_test.go @@ -75,6 +75,10 @@ func TestBuildAlertmanagerConfiguration(t *testing.T) { Scheme: "http", }, }, + { + address: "://user:pass@localhost:9093", + err: true, + }, } { t.Run(tc.address, func(t *testing.T) { cfg, err := BuildAlertmanagerConfig(nil, tc.address, time.Duration(0)) diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index de28a5aa9a..38ecd2b39f 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -82,28 +82,6 @@ func serializeAlertingConfiguration(t *testing.T, cfg ...alert.AlertmanagerConfi return b } -func writeAlertmanagerFileSD(t *testing.T, path string, addrs ...string) { - group := targetgroup.Group{Targets: []model.LabelSet{}} - for _, addr := range addrs { - group.Targets = append(group.Targets, model.LabelSet{model.LabelName(model.AddressLabel): model.LabelValue(addr)}) - } - - b, err := yaml.Marshal([]*targetgroup.Group{&group}) - if err != nil { - t.Errorf("failed to serialize file SD configuration: %v", err) - return - } - - err = ioutil.WriteFile(path+".tmp", b, 0660) - if err != nil { - t.Errorf("failed to write file SD configuration: %v", err) - return - } - - err = os.Rename(path+".tmp", path) - testutil.Ok(t, err) -} - type mockAlertmanager struct { path string token string @@ -232,7 +210,7 @@ func TestRuleAlertmanagerHTTPClient(t *testing.T) { r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil) q := querier(qAddr, a.New(), []address{r.GRPC}, nil) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) exit, err := e2eSpinup(t, ctx, q, r) if err != nil { t.Errorf("spinup failed: %v", err) @@ -293,7 +271,7 @@ func TestRuleAlertmanagerFileSD(t *testing.T) { r := rule(a.New(), a.New(), rulesDir, amCfg, []address{qAddr}, nil) q := querier(qAddr, a.New(), []address{r.GRPC}, nil) - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) exit, err := e2eSpinup(t, ctx, am, q, r) if err != nil { t.Errorf("spinup failed: %v", err) @@ -306,7 +284,7 @@ func TestRuleAlertmanagerFileSD(t *testing.T) { <-exit }() - // Wait for a couple of evaluations. + // Wait for a couple of evaluations and make sure that Alertmanager didn't receive anything. testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { select { case <-exit: @@ -340,8 +318,21 @@ func TestRuleAlertmanagerFileSD(t *testing.T) { return nil })) - // Update the Alertmanager file service discovery configuration. - writeAlertmanagerFileSD(t, filepath.Join(amDir, "targets.yaml"), am.HTTP.HostPort()) + // Add the Alertmanager address to the file SD directory. + fileSDPath := filepath.Join(amDir, "targets.yaml") + b, err := yaml.Marshal([]*targetgroup.Group{ + &targetgroup.Group{ + Targets: []model.LabelSet{ + model.LabelSet{ + model.LabelName(model.AddressLabel): model.LabelValue(am.HTTP.HostPort()), + }, + }, + }, + }) + testutil.Ok(t, err) + + testutil.Ok(t, ioutil.WriteFile(fileSDPath+".tmp", b, 0660)) + testutil.Ok(t, os.Rename(fileSDPath+".tmp", fileSDPath)) // Verify that alerts are received by Alertmanager. testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() (err error) { From 07f711b5709d9571b981fdf0217502a1034763b4 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Tue, 17 Dec 2019 11:15:52 +0100 Subject: [PATCH 09/10] Re-use dns.Provider for resolving Alertmanager addresses Signed-off-by: Simon Pasquier --- cmd/thanos/rule.go | 17 +++--- pkg/alert/client.go | 59 +++++++------------- pkg/alert/client_test.go | 101 +--------------------------------- pkg/discovery/dns/provider.go | 12 ++++ 4 files changed, 42 insertions(+), 147 deletions(-) diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 2a0e9e9969..629acdadfc 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -317,11 +317,19 @@ func runRule( alertingCfg.Alertmanagers = append(alertingCfg.Alertmanagers, cfg) } } + if len(alertingCfg.Alertmanagers) == 0 { level.Warn(logger).Log("msg", "no alertmanager configured") } + + amProvider := dns.NewProvider( + logger, + extprom.WrapRegistererWithPrefix("thanos_ruler_alertmanagers_", reg), + dns.ResolverType(dnsSDResolver), + ) for _, cfg := range alertingCfg.Alertmanagers { - am, err := alert.NewAlertmanager(logger, cfg) + // Each Alertmanager client needs its own DNS provider. + am, err := alert.NewAlertmanager(logger, cfg, amProvider.Clone()) if err != nil { return err } @@ -394,8 +402,6 @@ func runRule( } // Discover and resolve Alertmanager addresses. { - resolver := dns.NewResolver(dns.ResolverType(dnsSDResolver).ToResolver(logger)) - for i := range alertmgrs { am := alertmgrs[i] ctx, cancel := context.WithCancel(context.Background()) @@ -408,10 +414,7 @@ func runRule( g.Add(func() error { return runutil.Repeat(alertmgrsDNSSDInterval, ctx.Done(), func() error { - if err := am.Update(ctx, resolver); err != nil { - level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err) - alertMngrAddrResolutionErrors.Inc() - } + am.Resolve(ctx) return nil }) }, func(error) { diff --git a/pkg/alert/client.go b/pkg/alert/client.go index 35694b2ed5..f58b995a2b 100644 --- a/pkg/alert/client.go +++ b/pkg/alert/client.go @@ -14,7 +14,6 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/go-kit/kit/log/level" "github.com/pkg/errors" config_util "github.com/prometheus/common/config" "github.com/prometheus/common/model" @@ -161,6 +160,11 @@ func (c *AlertmanagerConfig) UnmarshalYAML(unmarshal func(interface{}) error) er return unmarshal((*plain)(c)) } +type AddressProvider interface { + Resolve(context.Context, []string) + Addresses() []string +} + // Alertmanager represents an HTTP client that can send alerts to a cluster of Alertmanager endpoints. type Alertmanager struct { logger log.Logger @@ -174,12 +178,11 @@ type Alertmanager struct { fileSDCache *cache.Cache fileDiscoverers []*file.Discovery - mtx sync.RWMutex - resolved []string + provider AddressProvider } // NewAlertmanager returns a new Alertmanager client. -func NewAlertmanager(logger log.Logger, cfg AlertmanagerConfig) (*Alertmanager, error) { +func NewAlertmanager(logger log.Logger, cfg AlertmanagerConfig, provider AddressProvider) (*Alertmanager, error) { if logger == nil { logger = log.NewNopLogger() } @@ -210,6 +213,7 @@ func NewAlertmanager(logger log.Logger, cfg AlertmanagerConfig) (*Alertmanager, staticAddresses: cfg.StaticAddresses, fileSDCache: cache.New(), fileDiscoverers: discoverers, + provider: provider, }, nil } @@ -237,6 +241,12 @@ func BuildAlertmanagerConfig(logger log.Logger, address string, timeout time.Dur // Scheme is of the form "+". scheme = strings.TrimPrefix(scheme, prefix) host = prefix + parsed.Host + if qType == dns.A { + if _, _, err := net.SplitHostPort(parsed.Host); err != nil { + // The host port could be missing. Append the defaultAlertmanagerPort. + host = host + ":" + strconv.Itoa(defaultAlertmanagerPort) + } + } break } } @@ -260,10 +270,8 @@ func BuildAlertmanagerConfig(logger log.Logger, address string, timeout time.Dur // Endpoints returns the list of known Alertmanager endpoints. func (a *Alertmanager) Endpoints() []*url.URL { - a.mtx.RLock() - defer a.mtx.RUnlock() var urls []*url.URL - for _, addr := range a.resolved { + for _, addr := range a.provider.Addresses() { urls = append(urls, &url.URL{ Scheme: a.scheme, @@ -275,7 +283,7 @@ func (a *Alertmanager) Endpoints() []*url.URL { return urls } -// Post sends a POST request to the given URL. +// Do sends a POST request to the given URL. func (a *Alertmanager) Do(ctx context.Context, u *url.URL, r io.Reader) error { req, err := http.NewRequest("POST", u.String(), r) if err != nil { @@ -329,36 +337,7 @@ func (a *Alertmanager) Discover(ctx context.Context) { wg.Wait() } -// Update refreshes and resolves the list of targets. -func (a *Alertmanager) Update(ctx context.Context, resolver dns.Resolver) error { - var resolved []string - for _, addr := range append(a.fileSDCache.Addresses(), a.staticAddresses...) { - level.Debug(a.logger).Log("msg", "resolving address", "addr", addr) - qtypeAndName := strings.SplitN(addr, "+", 2) - if len(qtypeAndName) != 2 { - level.Debug(a.logger).Log("msg", "no lookup needed", "addr", addr) - resolved = append(resolved, addr) - continue - } - qtype, name := dns.QType(qtypeAndName[0]), qtypeAndName[1] - - // Get only the host and resolve it if needed. - host := name - if qtype == dns.A { - if _, _, err := net.SplitHostPort(host); err != nil { - // The host port could be missing. Append the defaultAlertmanagerPort. - host = host + ":" + strconv.Itoa(defaultAlertmanagerPort) - } - } - addrs, err := resolver.Resolve(ctx, host, qtype) - if err != nil { - return errors.Wrap(err, "failed to resolve alertmanager address") - } - level.Debug(a.logger).Log("msg", "address resolved", "addr", addr, "resolved", strings.Join(addrs, ",")) - resolved = append(resolved, addrs...) - } - a.mtx.Lock() - a.resolved = resolved - a.mtx.Unlock() - return nil +// Resolve refreshes and resolves the list of Alertmanager targets. +func (a *Alertmanager) Resolve(ctx context.Context) { + a.provider.Resolve(ctx, append(a.fileSDCache.Addresses(), a.staticAddresses...)) } diff --git a/pkg/alert/client_test.go b/pkg/alert/client_test.go index 3fd94c9404..8b29d51c9e 100644 --- a/pkg/alert/client_test.go +++ b/pkg/alert/client_test.go @@ -1,14 +1,9 @@ package alert import ( - "context" - "strings" "testing" "time" - "github.com/pkg/errors" - - "github.com/thanos-io/thanos/pkg/discovery/dns" "github.com/thanos-io/thanos/pkg/testutil" ) @@ -57,7 +52,7 @@ func TestBuildAlertmanagerConfiguration(t *testing.T) { { address: "dns+https://localhost/path/prefix/", expected: AlertmanagerConfig{ - StaticAddresses: []string{"dns+localhost"}, + StaticAddresses: []string{"dns+localhost:9093"}, Scheme: "https", PathPrefix: "/path/prefix/", }, @@ -91,97 +86,3 @@ func TestBuildAlertmanagerConfiguration(t *testing.T) { }) } } - -type mockEntry struct { - name string - qtype dns.QType -} - -type mockResolver struct { - entries map[mockEntry][]string - err error -} - -func (m mockResolver) Resolve(ctx context.Context, name string, qtype dns.QType) ([]string, error) { - if m.err != nil { - return nil, m.err - } - if res, ok := m.entries[mockEntry{name: name, qtype: qtype}]; ok { - return res, nil - } - return nil, errors.Errorf("mockResolver not found response for name: %s", name) -} - -func TestUpdate(t *testing.T) { - for _, tc := range []struct { - cfg AlertmanagerConfig - resolver mockResolver - - resolved []string - err bool - }{ - { - cfg: AlertmanagerConfig{ - StaticAddresses: []string{"dns+alertmanager.example.com:9095"}, - }, - resolver: mockResolver{ - entries: map[mockEntry][]string{ - mockEntry{name: "alertmanager.example.com:9095", qtype: dns.A}: []string{"1.1.1.1:9095", "2.2.2.2:9095"}, - }, - }, - resolved: []string{"1.1.1.1:9095", "2.2.2.2:9095"}, - }, - { - cfg: AlertmanagerConfig{ - StaticAddresses: []string{"dns+alertmanager.example.com"}, - }, - resolver: mockResolver{ - entries: map[mockEntry][]string{ - mockEntry{name: "alertmanager.example.com:9093", qtype: dns.A}: []string{"1.1.1.1:9093", "2.2.2.2:9093"}, - }, - }, - resolved: []string{"1.1.1.1:9093", "2.2.2.2:9093"}, - }, - { - cfg: AlertmanagerConfig{ - StaticAddresses: []string{"alertmanager.example.com:9096"}, - }, - resolved: []string{"alertmanager.example.com:9096"}, - }, - { - cfg: AlertmanagerConfig{ - StaticAddresses: []string{"dnssrv+_web._tcp.alertmanager.example.com"}, - }, - resolver: mockResolver{ - entries: map[mockEntry][]string{ - mockEntry{name: "_web._tcp.alertmanager.example.com", qtype: dns.SRV}: []string{"1.1.1.1:9097", "2.2.2.2:9097"}, - }, - }, - resolved: []string{"1.1.1.1:9097", "2.2.2.2:9097"}, - }, - { - cfg: AlertmanagerConfig{ - StaticAddresses: []string{"dnssrv+_web._tcp.notfound.example.com"}, - }, - resolver: mockResolver{ - entries: map[mockEntry][]string{}, - }, - err: true, - }, - } { - t.Run(strings.Join(tc.cfg.StaticAddresses, ","), func(t *testing.T) { - am, err := NewAlertmanager(nil, tc.cfg) - testutil.Ok(t, err) - ctx := context.Background() - err = am.Update(ctx, &tc.resolver) - if tc.err { - t.Logf("%v", err) - testutil.NotOk(t, err) - return - } - - testutil.Equals(t, tc.resolved, am.resolved) - }) - } - -} diff --git a/pkg/discovery/dns/provider.go b/pkg/discovery/dns/provider.go index 148143ea4d..332135ba17 100644 --- a/pkg/discovery/dns/provider.go +++ b/pkg/discovery/dns/provider.go @@ -76,6 +76,18 @@ func NewProvider(logger log.Logger, reg prometheus.Registerer, resolverType Reso return p } +// Clone returns a new provider from an existing one. +func (p *Provider) Clone() *Provider { + return &Provider{ + resolver: p.resolver, + resolved: make(map[string][]string), + logger: p.logger, + resolverAddrs: p.resolverAddrs, + resolverLookupsCount: p.resolverLookupsCount, + resolverFailuresCount: p.resolverFailuresCount, + } +} + // Resolve stores a list of provided addresses or their DNS records if requested. // Addresses prefixed with `dns+` or `dnssrv+` will be resolved through respective DNS lookup (A/AAAA or SRV). // defaultPort is used for non-SRV records when a port is not supplied. From f467acfc522ea54a4798dd6bf89be9a10aec4234 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Tue, 17 Dec 2019 17:04:41 +0100 Subject: [PATCH 10/10] update documentation Signed-off-by: Simon Pasquier --- CHANGELOG.md | 1 - cmd/thanos/rule.go | 2 +- docs/service-discovery.md | 4 ++-- test/e2e/rule_test.go | 11 +++++++++++ 4 files changed, 14 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35cdc6f7fd..b4634cb1dd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,6 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#1852](https://github.com/thanos-io/thanos/pull/1852) Add support for `AWS_CONTAINER_CREDENTIALS_FULL_URI` by upgrading to minio-go v6.0.44 - [#1854](https://github.com/thanos-io/thanos/pull/1854) Update Rule UI to support alerts count displaying and filtering. - [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add TLS and authentication support for Alertmanager with the `--alertmanagers.config` and `--alertmanagers.config-file` CLI flags. See [documentation](docs/components/rule.md/#configuration) for further information. - - [#1838](https://github.com/thanos-io/thanos/pull/1838) Ruler: Add a new `--alertmanagers.sd-dns-interval` CLI option to specify the interval between DNS resolutions of Alertmanager hosts. ## [v0.9.0](https://github.com/thanos-io/thanos/releases/tag/v0.9.0) - 2019.12.03 diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 629acdadfc..9e3af8c8d4 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -328,7 +328,7 @@ func runRule( dns.ResolverType(dnsSDResolver), ) for _, cfg := range alertingCfg.Alertmanagers { - // Each Alertmanager client needs its own DNS provider. + // Each Alertmanager client has a different list of targets thus each needs its own DNS provider. am, err := alert.NewAlertmanager(logger, cfg, amProvider.Clone()) if err != nil { return err diff --git a/docs/service-discovery.md b/docs/service-discovery.md index 0c76b733c0..34cf0914cd 100644 --- a/docs/service-discovery.md +++ b/docs/service-discovery.md @@ -33,7 +33,7 @@ The repeatable flag `--store=` can be used to specify a `StoreAPI` that ` The repeatable flag `--query=` can be used to specify a `QueryAPI` that `Thanos Rule` should use. -The repeatable flag `--alertmanagers.url=` can be used to specify a `Alertmanager API` that `Thanos Rule` should use. +`Thanos Rule` also supports the configuration of Alertmanager endpoints using YAML with the `--alertmanagers.config=` and `--alertmanagers.config-file=` flags in the `StaticAddress` section. ## File Service Discovery @@ -77,7 +77,7 @@ Again, the `` can be a glob pattern. The flag `--query.sd-interval=<5m>` can be used to change the fallback re-read interval. -`Thanos Rule` also supports the configuration of Alertmanager endpoints using YAML with the `--alertmanagers.config=` and `--alertmanagers.config-file=` flags. +`Thanos Rule` also supports the configuration of Alertmanager endpoints using YAML with the `--alertmanagers.config=` and `--alertmanagers.config-file=` flags in the `FileSDfiles` section.. ## DNS Service Discovery diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 38ecd2b39f..7cfe508f53 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -157,6 +157,17 @@ func (m *mockAlertmanager) ServeHTTP(resp http.ResponseWriter, req *http.Request m.mtx.Unlock() } +// TestRuleAlertmanagerHTTPClient verifies that Thanos Ruler can send alerts to +// Alertmanager in various setups: +// * Plain HTTP. +// * HTTPS with custom CA. +// * API with a prefix. +// * API protected by bearer token authentication. +// +// Because Alertmanager supports HTTP only and no authentication, the test uses +// a mocked server instead of the "real" Alertmanager service. +// The other end-to-end tests exercise against the "real" Alertmanager +// implementation. func TestRuleAlertmanagerHTTPClient(t *testing.T) { a := newLocalAddresser()