diff --git a/CHANGELOG.md b/CHANGELOG.md index d77ee42585..f08a074502 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,18 @@ - `-.s3.sse.kms-encryption-context` * [FEATURE] Querier: Enable `@ ` modifier in PromQL using the new `-querier.at-modifier-enabled` flag. #3744 * [FEATURE] Overrides Exporter: Add `overrides-exporter` module for exposing per-tenant resource limit overrides as metrics. It is not included in `all` target, and must be explicitly enabled. #3785 +* [FEATURE] Experimental thanosconvert: introduce an experimental tool `thanosconvert` to migrate Thanos block metadata to Cortex metadata. #3770 +* [FEATURE] Alertmanager: It now shards the `/api/v1/alerts` API using the ring when sharding is enabled. #3671 + * Added `-alertmanager.max-recv-msg-size` (defaults to 16M) to limit the size of HTTP request body handled by the alertmanager. + * New flags added for communication between alertmanagers: + * `-alertmanager.max-recv-msg-size` + * `-alertmanager.alertmanager-client.remote-timeout` + * `-alertmanager.alertmanager-client.tls-enabled` + * `-alertmanager.alertmanager-client.tls-cert-path` + * `-alertmanager.alertmanager-client.tls-key-path` + * `-alertmanager.alertmanager-client.tls-ca-path` + * `-alertmanager.alertmanager-client.tls-server-name` + * `-alertmanager.alertmanager-client.tls-insecure-skip-verify` * [ENHANCEMENT] Ruler: Add TLS and explicit basis authentication configuration options for the HTTP client the ruler uses to communicate with the alertmanager. #3752 * `-ruler.alertmanager-client.basic-auth-username`: Configure the basic authentication username used by the client. Takes precedent over a URL configured username. * `-ruler.alertmanager-client.basic-auth-password`: Configure the basic authentication password used by the client. Takes precedent over a URL configured password. @@ -21,7 +33,6 @@ * `-ruler.alertmanager-client.tls-insecure-skip-verify`: Boolean to disable verifying the certificate. * `-ruler.alertmanager-client.tls-key-path`: File path to the TLS key certificate. * `-ruler.alertmanager-client.tls-server-name`: Expected name on the TLS certificate. -* [FEATURE] Experimental thanosconvert: introduce an experimental tool `thanosconvert` to migrate Thanos block metadata to Cortex metadata. #3770 * [ENHANCEMENT] Ingester: exposed metric `cortex_ingester_oldest_unshipped_block_timestamp_seconds`, tracking the unix timestamp of the oldest TSDB block not shipped to the storage yet. #3705 * [ENHANCEMENT] Prometheus upgraded. #3739 * Avoid unnecessary `runtime.GC()` during compactions. diff --git a/Makefile b/Makefile index fa7e4a5570..bbf18c353e 100644 --- a/Makefile +++ b/Makefile @@ -89,6 +89,7 @@ pkg/scheduler/schedulerpb/scheduler.pb.go: pkg/scheduler/schedulerpb/scheduler.p pkg/storegateway/storegatewaypb/gateway.pb.go: pkg/storegateway/storegatewaypb/gateway.proto pkg/chunk/grpc/grpc.pb.go: pkg/chunk/grpc/grpc.proto tools/blocksconvert/scheduler.pb.go: tools/blocksconvert/scheduler.proto +pkg/alertmanager/alertmanagerpb/alertmanager.pb.go: pkg/alertmanager/alertmanagerpb/alertmanager.proto all: $(UPTODATE_FILES) test: protos diff --git a/development/tsdb-blocks-storage-s3/config/cortex.yaml b/development/tsdb-blocks-storage-s3/config/cortex.yaml index bf5e147894..32d0ab0c82 100644 --- a/development/tsdb-blocks-storage-s3/config/cortex.yaml +++ b/development/tsdb-blocks-storage-s3/config/cortex.yaml @@ -86,6 +86,15 @@ ruler: alertmanager: enable_api: true + sharding_enabled: true + sharding_ring: + replication_factor: 3 + heartbeat_period: 5s + heartbeat_timeout: 15s + kvstore: + store: consul + consul: + host: consul:8500 storage: type: s3 s3: diff --git a/development/tsdb-blocks-storage-s3/docker-compose.yml b/development/tsdb-blocks-storage-s3/docker-compose.yml index 87a4924db6..ef62c70266 100644 --- a/development/tsdb-blocks-storage-s3/docker-compose.yml +++ b/development/tsdb-blocks-storage-s3/docker-compose.yml @@ -109,7 +109,7 @@ services: volumes: - ./config:/cortex/config - .data-ingester-2:/tmp/cortex-tsdb-ingester:delegated - + querier: build: context: . @@ -215,18 +215,48 @@ services: volumes: - ./config:/cortex/config - alertmanager: + alertmanager-1: + build: + context: . + dockerfile: dev.dockerfile + image: cortex + command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18031 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=alertmanager -server.http-listen-port=8031 -server.grpc-listen-port=9031 -alertmanager.web.external-url=localhost:8031"] + depends_on: + - consul + - minio + ports: + - 8031:8031 + - 18031:18031 + volumes: + - ./config:/cortex/config + + alertmanager-2: + build: + context: . + dockerfile: dev.dockerfile + image: cortex + command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18032 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=alertmanager -server.http-listen-port=8032 -server.grpc-listen-port=9032 -alertmanager.web.external-url=localhost:8032"] + depends_on: + - consul + - minio + ports: + - 8032:8032 + - 18032:18032 + volumes: + - ./config:/cortex/config + + alertmanager-3: build: context: . dockerfile: dev.dockerfile image: cortex - command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18010 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=alertmanager -server.http-listen-port=8010 -server.grpc-listen-port=9010 -alertmanager.web.external-url=localhost:8010"] + command: ["sh", "-c", "sleep 3 && exec ./dlv exec ./cortex --listen=:18033 --headless=true --api-version=2 --accept-multiclient --continue -- -config.file=./config/cortex.yaml -target=alertmanager -server.http-listen-port=8033 -server.grpc-listen-port=9033 -alertmanager.web.external-url=localhost:8033"] depends_on: - consul - minio ports: - - 8010:8010 - - 18010:18010 + - 8033:8033 + - 18033:18033 volumes: - ./config:/cortex/config diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 850693ba38..4cc5002f69 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1572,6 +1572,10 @@ The `alertmanager_config` configures the Cortex alertmanager. # CLI flag: -alertmanager.configs.poll-interval [poll_interval: | default = 15s] +# Maximum size (bytes) of an accepted HTTP request body. +# CLI flag: -alertmanager.max-recv-msg-size +[max_recv_msg_size: | default = 16777216] + # Deprecated. Use -alertmanager.cluster.listen-address instead. # CLI flag: -cluster.listen-address [cluster_bind_address: | default = "0.0.0.0:9094"] @@ -1840,6 +1844,40 @@ cluster: # Enable the experimental alertmanager config api. # CLI flag: -experimental.alertmanager.enable-api [enable_api: | default = false] + +alertmanager_client: + # Timeout for downstream alertmanagers. + # CLI flag: -alertmanager.alertmanager-client.remote-timeout + [remote_timeout: | default = 2s] + + # Enable TLS in the GRPC client. This flag needs to be enabled when any other + # TLS flag is set. If set to false, insecure connection to gRPC server will be + # used. + # CLI flag: -alertmanager.alertmanager-client.tls-enabled + [tls_enabled: | default = false] + + # Path to the client certificate file, which will be used for authenticating + # with the server. Also requires the key path to be configured. + # CLI flag: -alertmanager.alertmanager-client.tls-cert-path + [tls_cert_path: | default = ""] + + # Path to the key file for the client certificate. Also requires the client + # certificate to be configured. + # CLI flag: -alertmanager.alertmanager-client.tls-key-path + [tls_key_path: | default = ""] + + # Path to the CA certificates file to validate server certificate against. If + # not set, the host's root CA certificates are used. + # CLI flag: -alertmanager.alertmanager-client.tls-ca-path + [tls_ca_path: | default = ""] + + # Override the expected name on the server certificate. + # CLI flag: -alertmanager.alertmanager-client.tls-server-name + [tls_server_name: | default = ""] + + # Skip validating server certificate. + # CLI flag: -alertmanager.alertmanager-client.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] ``` ### `table_manager_config` diff --git a/pkg/alertmanager/alertmanager_client.go b/pkg/alertmanager/alertmanager_client.go new file mode 100644 index 0000000000..963b531324 --- /dev/null +++ b/pkg/alertmanager/alertmanager_client.go @@ -0,0 +1,132 @@ +package alertmanager + +import ( + "flag" + "time" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb" + "github.com/cortexproject/cortex/pkg/ring/client" + "github.com/cortexproject/cortex/pkg/util/grpcclient" + "github.com/cortexproject/cortex/pkg/util/tls" +) + +// ClientsPool is the interface used to get the client from the pool for a specified address. +type ClientsPool interface { + // GetClientFor returns the alertmanager client for the given address. + GetClientFor(addr string) (Client, error) +} + +// Client is the interface that should be implemented by any client used to read/write data to an alertmanager via GRPC. +type Client interface { + alertmanagerpb.AlertmanagerClient + + // RemoteAddress returns the address of the remote alertmanager and is used to uniquely + // identify an alertmanager instance. + RemoteAddress() string +} + +// ClientConfig is the configuration struct for the alertmanager client. +type ClientConfig struct { + RemoteTimeout time.Duration `yaml:"remote_timeout"` + TLSEnabled bool `yaml:"tls_enabled"` + TLS tls.ClientConfig `yaml:",inline"` +} + +// RegisterFlagsWithPrefix registers flags with prefix. +func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS in the GRPC client. This flag needs to be enabled when any other TLS flag is set. If set to false, insecure connection to gRPC server will be used.") + f.DurationVar(&cfg.RemoteTimeout, prefix+".remote-timeout", 2*time.Second, "Timeout for downstream alertmanagers.") + cfg.TLS.RegisterFlagsWithPrefix(prefix, f) +} + +type alertmanagerClientsPool struct { + pool *client.Pool +} + +func newAlertmanagerClientsPool(discovery client.PoolServiceDiscovery, amClientCfg ClientConfig, logger log.Logger, reg prometheus.Registerer) ClientsPool { + // We prefer sane defaults instead of exposing further config options. + grpcCfg := grpcclient.Config{ + MaxRecvMsgSize: 16 * 1024 * 1024, + MaxSendMsgSize: 4 * 1024 * 1024, + GRPCCompression: "", + RateLimit: 0, + RateLimitBurst: 0, + BackoffOnRatelimits: false, + TLSEnabled: amClientCfg.TLSEnabled, + TLS: amClientCfg.TLS, + } + + requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "cortex_alertmanager_distributor_client_request_duration_seconds", + Help: "Time spent executing requests from an alertmanager to another alertmanager.", + Buckets: prometheus.ExponentialBuckets(0.008, 4, 7), + }, []string{"operation", "status_code"}) + + factory := func(addr string) (client.PoolClient, error) { + return dialAlertmanagerClient(grpcCfg, addr, requestDuration) + } + + poolCfg := client.PoolConfig{ + CheckInterval: time.Minute, + HealthCheckEnabled: true, + HealthCheckTimeout: 10 * time.Second, + } + + clientsCount := promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Namespace: "cortex", + Name: "alertmanager_distributor_clients", + Help: "The current number of alertmanager distributor clients in the pool.", + }) + + return &alertmanagerClientsPool{pool: client.NewPool("alertmanager", poolCfg, discovery, factory, clientsCount, logger)} +} + +func (f *alertmanagerClientsPool) GetClientFor(addr string) (Client, error) { + c, err := f.pool.GetClientFor(addr) + if err != nil { + return nil, err + } + return c.(Client), nil +} + +func dialAlertmanagerClient(cfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*alertmanagerClient, error) { + opts, err := cfg.DialOption(grpcclient.Instrument(requestDuration)) + if err != nil { + return nil, err + } + conn, err := grpc.Dial(addr, opts...) + if err != nil { + return nil, errors.Wrapf(err, "failed to dial alertmanager %s", addr) + } + + return &alertmanagerClient{ + AlertmanagerClient: alertmanagerpb.NewAlertmanagerClient(conn), + HealthClient: grpc_health_v1.NewHealthClient(conn), + conn: conn, + }, nil +} + +type alertmanagerClient struct { + alertmanagerpb.AlertmanagerClient + grpc_health_v1.HealthClient + conn *grpc.ClientConn +} + +func (c *alertmanagerClient) Close() error { + return c.conn.Close() +} + +func (c *alertmanagerClient) String() string { + return c.RemoteAddress() +} + +func (c *alertmanagerClient) RemoteAddress() string { + return c.conn.Target() +} diff --git a/pkg/alertmanager/alertmanager_ring.go b/pkg/alertmanager/alertmanager_ring.go index 9fe2d9dea1..9b8d6f00f5 100644 --- a/pkg/alertmanager/alertmanager_ring.go +++ b/pkg/alertmanager/alertmanager_ring.go @@ -26,7 +26,7 @@ const ( RingNumTokens = 128 ) -// RingOp is the operation used for distributing tenants between alertmanagers. +// RingOp is the operation used for reading/writing to the alertmanagers. var RingOp = ring.NewOp([]ring.IngesterState{ring.ACTIVE}, func(s ring.IngesterState) bool { // Only ACTIVE Alertmanager get requests. If instance is not ACTIVE, we need to find another Alertmanager. return s != ring.ACTIVE @@ -77,7 +77,7 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { cfg.InstanceInterfaceNames = []string{"eth0", "en0"} f.Var((*flagext.StringSlice)(&cfg.InstanceInterfaceNames), rfprefix+"instance-interface-names", "Name of network interface to read address from.") f.StringVar(&cfg.InstanceAddr, rfprefix+"instance-addr", "", "IP address to advertise in the ring.") - f.IntVar(&cfg.InstancePort, rfprefix+"instance-port", 0, "Port to advertise in the ring (defaults to server.http-listen-port).") + f.IntVar(&cfg.InstancePort, rfprefix+"instance-port", 0, "Port to advertise in the ring (defaults to server.grpc-listen-port).") f.StringVar(&cfg.InstanceID, rfprefix+"instance-id", hostname, "Instance ID to register in the ring.") cfg.RingCheckPeriod = 5 * time.Second diff --git a/pkg/alertmanager/alertmanagerpb/alertmanager.pb.go b/pkg/alertmanager/alertmanagerpb/alertmanager.pb.go new file mode 100644 index 0000000000..9e3c2c4036 --- /dev/null +++ b/pkg/alertmanager/alertmanagerpb/alertmanager.pb.go @@ -0,0 +1,128 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: alertmanager.proto + +package alertmanagerpb + +import ( + context "context" + fmt "fmt" + _ "github.com/gogo/protobuf/gogoproto" + proto "github.com/gogo/protobuf/proto" + httpgrpc "github.com/weaveworks/common/httpgrpc" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +func init() { proto.RegisterFile("alertmanager.proto", fileDescriptor_e60437b6e0c74c9a) } + +var fileDescriptor_e60437b6e0c74c9a = []byte{ + // 227 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4a, 0xcc, 0x49, 0x2d, + 0x2a, 0xc9, 0x4d, 0xcc, 0x4b, 0x4c, 0x4f, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, + 0x43, 0x16, 0x2b, 0x48, 0x92, 0xd2, 0x4d, 0xcf, 0x2c, 0xc9, 0x28, 0x4d, 0xd2, 0x4b, 0xce, 0xcf, + 0xd5, 0x4f, 0xcf, 0x4f, 0xcf, 0xd7, 0x07, 0x2b, 0x4b, 0x2a, 0x4d, 0x03, 0xf3, 0xc0, 0x1c, 0x30, + 0x0b, 0xa2, 0x5d, 0xca, 0x04, 0x49, 0x79, 0x79, 0x6a, 0x62, 0x59, 0x6a, 0x79, 0x7e, 0x51, 0x76, + 0xb1, 0x7e, 0x72, 0x7e, 0x6e, 0x6e, 0x7e, 0x9e, 0x7e, 0x46, 0x49, 0x49, 0x41, 0x7a, 0x51, 0x41, + 0x32, 0x9c, 0x01, 0xd1, 0x65, 0x14, 0xc0, 0xc5, 0xe3, 0x88, 0x64, 0xad, 0x90, 0x03, 0x17, 0xaf, + 0x47, 0x62, 0x5e, 0x4a, 0x4e, 0x6a, 0x50, 0x6a, 0x61, 0x69, 0x6a, 0x71, 0x89, 0x90, 0xa8, 0x1e, + 0x5c, 0x87, 0x47, 0x48, 0x48, 0x00, 0x54, 0x58, 0x4a, 0x0c, 0x5d, 0xb8, 0xb8, 0x20, 0x3f, 0xaf, + 0x38, 0x55, 0x89, 0xc1, 0xc9, 0xe5, 0xc2, 0x43, 0x39, 0x86, 0x1b, 0x0f, 0xe5, 0x18, 0x3e, 0x3c, + 0x94, 0x63, 0x6c, 0x78, 0x24, 0xc7, 0xb8, 0xe2, 0x91, 0x1c, 0xe3, 0x89, 0x47, 0x72, 0x8c, 0x17, + 0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0xf8, 0xe2, 0x91, 0x1c, 0xc3, 0x87, 0x47, 0x72, 0x8c, + 0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x14, 0x9a, 0xe7, + 0x93, 0xd8, 0xc0, 0xce, 0x33, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0x5f, 0xb2, 0x09, 0x8d, 0x29, + 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// AlertmanagerClient is the client API for Alertmanager service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type AlertmanagerClient interface { + HandleRequest(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) +} + +type alertmanagerClient struct { + cc *grpc.ClientConn +} + +func NewAlertmanagerClient(cc *grpc.ClientConn) AlertmanagerClient { + return &alertmanagerClient{cc} +} + +func (c *alertmanagerClient) HandleRequest(ctx context.Context, in *httpgrpc.HTTPRequest, opts ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + out := new(httpgrpc.HTTPResponse) + err := c.cc.Invoke(ctx, "/alertmanagerpb.Alertmanager/HandleRequest", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// AlertmanagerServer is the server API for Alertmanager service. +type AlertmanagerServer interface { + HandleRequest(context.Context, *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) +} + +// UnimplementedAlertmanagerServer can be embedded to have forward compatible implementations. +type UnimplementedAlertmanagerServer struct { +} + +func (*UnimplementedAlertmanagerServer) HandleRequest(ctx context.Context, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method HandleRequest not implemented") +} + +func RegisterAlertmanagerServer(s *grpc.Server, srv AlertmanagerServer) { + s.RegisterService(&_Alertmanager_serviceDesc, srv) +} + +func _Alertmanager_HandleRequest_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(httpgrpc.HTTPRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(AlertmanagerServer).HandleRequest(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/alertmanagerpb.Alertmanager/HandleRequest", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(AlertmanagerServer).HandleRequest(ctx, req.(*httpgrpc.HTTPRequest)) + } + return interceptor(ctx, in, info, handler) +} + +var _Alertmanager_serviceDesc = grpc.ServiceDesc{ + ServiceName: "alertmanagerpb.Alertmanager", + HandlerType: (*AlertmanagerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "HandleRequest", + Handler: _Alertmanager_HandleRequest_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "alertmanager.proto", +} diff --git a/pkg/alertmanager/alertmanagerpb/alertmanager.proto b/pkg/alertmanager/alertmanagerpb/alertmanager.proto new file mode 100644 index 0000000000..bac88baa7c --- /dev/null +++ b/pkg/alertmanager/alertmanagerpb/alertmanager.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +package alertmanagerpb; + +option go_package = "alertmanagerpb"; + +import "github.com/gogo/protobuf/gogoproto/gogo.proto"; +import "github.com/weaveworks/common/httpgrpc/httpgrpc.proto"; + +option (gogoproto.marshaler_all) = true; +option (gogoproto.unmarshaler_all) = true; + +// Alertmanager interface exposed to the Alertmanager Distributor +service Alertmanager { + rpc HandleRequest(httpgrpc.HTTPRequest) returns(httpgrpc.HTTPResponse) {}; +} diff --git a/pkg/alertmanager/distributor.go b/pkg/alertmanager/distributor.go new file mode 100644 index 0000000000..7d304dd8d4 --- /dev/null +++ b/pkg/alertmanager/distributor.go @@ -0,0 +1,248 @@ +package alertmanager + +import ( + "context" + "hash/fnv" + "io/ioutil" + "math/rand" + "net/http" + "strings" + "sync" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/client" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/services" +) + +// Distributor forwards requests to individual alertmanagers. +type Distributor struct { + services.Service + + cfg ClientConfig + maxRecvMsgSize int64 + requestsInFlight sync.WaitGroup + + alertmanagerRing ring.ReadRing + alertmanagerClientsPool ClientsPool + + logger log.Logger +} + +// NewDistributor constructs a new Distributor +func NewDistributor(cfg ClientConfig, maxRecvMsgSize int64, alertmanagersRing *ring.Ring, alertmanagerClientsPool ClientsPool, logger log.Logger, reg prometheus.Registerer) (d *Distributor, err error) { + if alertmanagerClientsPool == nil { + alertmanagerClientsPool = newAlertmanagerClientsPool(client.NewRingServiceDiscovery(alertmanagersRing), cfg, logger, reg) + } + + d = &Distributor{ + cfg: cfg, + logger: logger, + maxRecvMsgSize: maxRecvMsgSize, + alertmanagerRing: alertmanagersRing, + alertmanagerClientsPool: alertmanagerClientsPool, + } + + d.Service = services.NewBasicService(nil, d.running, nil) + return d, nil +} + +func (d *Distributor) running(ctx context.Context) error { + <-ctx.Done() + d.requestsInFlight.Wait() + return nil +} + +// IsPathSupported returns true if the given route is currently supported by the Distributor. +func (d *Distributor) IsPathSupported(path string) bool { + // API can be found at https://petstore.swagger.io/?url=https://raw.githubusercontent.com/prometheus/alertmanager/master/api/v2/openapi.yaml. + return strings.HasSuffix(path, "/alerts") || + strings.HasSuffix(path, "/alerts/groups") +} + +// DistributeRequest shards the writes and returns as soon as the quorum is satisfied. +// In case of reads, it proxies the request to one of the alertmanagers. +// DistributeRequest assumes that the caller has verified IsPathSupported returns +// true for the route. +func (d *Distributor) DistributeRequest(w http.ResponseWriter, r *http.Request) { + d.requestsInFlight.Add(1) + defer d.requestsInFlight.Done() + + userID, err := tenant.TenantID(r.Context()) + if err != nil { + http.Error(w, err.Error(), http.StatusUnauthorized) + return + } + + logger := util_log.WithContext(r.Context(), d.logger) + + if r.Method == http.MethodGet || r.Method == http.MethodHead { + d.doRead(userID, w, r, logger) + } else { + d.doWrite(userID, w, r, logger) + } +} + +func (d *Distributor) doWrite(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger) { + var body []byte + var err error + if r.Body != nil { + body, err = ioutil.ReadAll(http.MaxBytesReader(w, r.Body, d.maxRecvMsgSize)) + if err != nil { + if util.IsRequestBodyTooLarge(err) { + http.Error(w, "Request body too large", http.StatusRequestEntityTooLarge) + return + } + level.Error(logger).Log("msg", "failed to read the request body during write", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + } + + var firstSuccessfulResponse *httpgrpc.HTTPResponse + var firstSuccessfulResponseMtx sync.Mutex + grpcHeaders := httpToHttpgrpcHeaders(r.Header) + err = ring.DoBatch(r.Context(), RingOp, d.alertmanagerRing, []uint32{shardByUser(userID)}, func(am ring.InstanceDesc, _ []int) error { + // Use a background context to make sure all alertmanagers get the request even if we return early. + localCtx := user.InjectOrgID(context.Background(), userID) + sp, localCtx := opentracing.StartSpanFromContext(localCtx, "Distributor.doWrite") + defer sp.Finish() + + resp, err := d.doRequest(localCtx, am, &httpgrpc.HTTPRequest{ + Method: r.Method, + Url: r.RequestURI, + Body: body, + Headers: grpcHeaders, + }) + if err != nil { + return err + } + + if resp.Code/100 != 2 { + return httpgrpc.ErrorFromHTTPResponse(resp) + } + + firstSuccessfulResponseMtx.Lock() + if firstSuccessfulResponse == nil { + firstSuccessfulResponse = resp + } + firstSuccessfulResponseMtx.Unlock() + + return nil + }, func() {}) + + if err != nil { + respondFromError(err, w, logger) + } + + firstSuccessfulResponseMtx.Lock() // Another request might be ongoing after quorum. + resp := firstSuccessfulResponse + firstSuccessfulResponseMtx.Unlock() + + if resp != nil { + respondFromHTTPGRPCResponse(w, resp) + } else { + // This should not happen. + level.Error(logger).Log("msg", "distributor did not receive any response from alertmanagers, but there were no errors") + w.WriteHeader(http.StatusInternalServerError) + } +} + +func (d *Distributor) doRead(userID string, w http.ResponseWriter, r *http.Request, logger log.Logger) { + key := shardByUser(userID) + replicationSet, err := d.alertmanagerRing.Get(key, RingOp, nil, nil, nil) + if err != nil { + level.Error(logger).Log("msg", "failed to get replication set from the ring", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + + body, err := ioutil.ReadAll(http.MaxBytesReader(w, r.Body, d.maxRecvMsgSize)) + if err != nil { + if util.IsRequestBodyTooLarge(err) { + http.Error(w, "Request body too large", http.StatusRequestEntityTooLarge) + return + } + level.Error(logger).Log("msg", "failed to read the request body during read", "err", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + req := &httpgrpc.HTTPRequest{ + Method: r.Method, + Url: r.RequestURI, + Body: body, + Headers: httpToHttpgrpcHeaders(r.Header), + } + + sp, ctx := opentracing.StartSpanFromContext(r.Context(), "Distributor.doRead") + defer sp.Finish() + // Until we have a mechanism to combine the results from multiple alertmanagers, + // we forward the request to only only of the alertmanagers. + amDesc := replicationSet.Ingesters[rand.Intn(len(replicationSet.Ingesters))] + resp, err := d.doRequest(ctx, amDesc, req) + if err != nil { + respondFromError(err, w, logger) + return + } + + respondFromHTTPGRPCResponse(w, resp) +} + +func respondFromError(err error, w http.ResponseWriter, logger log.Logger) { + httpResp, ok := httpgrpc.HTTPResponseFromError(errors.Cause(err)) + if !ok { + level.Error(logger).Log("msg", "failed to process the request to the alertmanager", "err", err) + http.Error(w, "Failed to process the request to the alertmanager", http.StatusInternalServerError) + return + } + respondFromHTTPGRPCResponse(w, httpResp) +} + +func respondFromHTTPGRPCResponse(w http.ResponseWriter, httpResp *httpgrpc.HTTPResponse) { + for _, h := range httpResp.Headers { + for _, v := range h.Values { + w.Header().Add(h.Key, v) + } + } + w.WriteHeader(int(httpResp.Code)) + w.Write(httpResp.Body) //nolint +} + +func (d *Distributor) doRequest(ctx context.Context, am ring.InstanceDesc, req *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + ctx, cancel := context.WithTimeout(ctx, d.cfg.RemoteTimeout) + defer cancel() + amClient, err := d.alertmanagerClientsPool.GetClientFor(am.Addr) + if err != nil { + return nil, errors.Wrapf(err, "failed to get alertmanager client from pool (alertmanager address: %s)", am.Addr) + } + + return amClient.HandleRequest(ctx, req) +} + +func shardByUser(userID string) uint32 { + ringHasher := fnv.New32a() + // Hasher never returns err. + _, _ = ringHasher.Write([]byte(userID)) + return ringHasher.Sum32() +} + +func httpToHttpgrpcHeaders(hs http.Header) []*httpgrpc.Header { + result := make([]*httpgrpc.Header, 0, len(hs)) + for k, vs := range hs { + result = append(result, &httpgrpc.Header{ + Key: k, + Values: vs, + }) + } + return result +} diff --git a/pkg/alertmanager/distributor_test.go b/pkg/alertmanager/distributor_test.go new file mode 100644 index 0000000000..2723a7a078 --- /dev/null +++ b/pkg/alertmanager/distributor_test.go @@ -0,0 +1,292 @@ +package alertmanager + +import ( + "bytes" + "context" + "errors" + "fmt" + "math" + "net/http" + "net/http/httptest" + "net/url" + "sync" + "testing" + "time" + + "github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/ring/kv/consul" + "github.com/cortexproject/cortex/pkg/util/flagext" + util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/cortexproject/cortex/pkg/util/test" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/user" + "google.golang.org/grpc" + "google.golang.org/grpc/health/grpc_health_v1" +) + +func TestDistributor_DistributeRequest(t *testing.T) { + cases := []struct { + name string + numAM, numHappyAM int + replicationFactor int + isRead bool + expStatusCode int + expectedTotalCalls int + headersNotPreserved bool + }{ + { + name: "Simple AM request, all AM healthy", + numAM: 4, + numHappyAM: 4, + replicationFactor: 3, + expStatusCode: http.StatusOK, + expectedTotalCalls: 3, + }, { + name: "Less than quorum AM available", + numAM: 1, + numHappyAM: 1, + replicationFactor: 3, + expStatusCode: http.StatusInternalServerError, + expectedTotalCalls: 0, + headersNotPreserved: true, // There is nothing to preserve since it does not hit any AM. + }, { + name: "Less than quorum AM succeed", + numAM: 5, + numHappyAM: 3, // Though we have 3 happy, it will hit >1 unhappy AM. + replicationFactor: 3, + expStatusCode: http.StatusInternalServerError, + expectedTotalCalls: 3, + }, { + name: "Read is sent to only 1 AM", + numAM: 5, + numHappyAM: 5, + replicationFactor: 3, + isRead: true, + expStatusCode: http.StatusOK, + expectedTotalCalls: 1, + }, + } + + route := "/alertmanager/api/v1/alerts" + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + d, ams, cleanup := prepare(t, c.numAM, c.numHappyAM, c.replicationFactor) + t.Cleanup(cleanup) + + ctx := user.InjectOrgID(context.Background(), "1") + + url := "http://127.0.0.1:9999" + route + req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader([]byte{1, 2, 3, 4})) + require.NoError(t, err) + if c.isRead { + req.Method = http.MethodGet + } + req.RequestURI = url + + w := httptest.NewRecorder() + d.DistributeRequest(w, req) + resp := w.Result() + + require.Equal(t, c.expStatusCode, resp.StatusCode) + + if !c.headersNotPreserved { + // Making sure the headers are not altered. + contentType := []string{"it-is-ok"} + contentTypeOptions := []string{"ok-option-1", "ok-option-2"} + if resp.StatusCode != http.StatusOK { + contentType = []string{"it-is-not-ok"} + contentTypeOptions = []string{"not-ok-option-1", "not-ok-option-2"} + } + require.Equal(t, contentType, resp.Header.Values("Content-Type")) + require.Equal(t, contentTypeOptions, resp.Header.Values("X-Content-Type-Options")) + } + + // Since the response is sent as soon as the quorum is reached, when we + // reach this point the 3rd AM may not have received the request yet. + // To avoid flaky test we retry until we hit the desired state within a reasonable timeout. + test.Poll(t, time.Second, c.expectedTotalCalls, func() interface{} { + totalReqCount := 0 + for _, a := range ams { + reqCount := a.requestsCount(route) + // AM should not get duplicate requests. + require.True(t, reqCount <= 1, "duplicate requests %d", reqCount) + totalReqCount += reqCount + } + + return totalReqCount + }) + }) + } + +} + +func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int) (*Distributor, []*mockAlertmanager, func()) { + ams := []*mockAlertmanager{} + for i := 0; i < numHappyAM; i++ { + ams = append(ams, newMockAlertmanager(i, true)) + } + for i := numHappyAM; i < numAM; i++ { + ams = append(ams, newMockAlertmanager(i, false)) + } + + // Use a real ring with a mock KV store to test ring RF logic. + amDescs := map[string]ring.InstanceDesc{} + amByAddr := map[string]*mockAlertmanager{} + for i, a := range ams { + amDescs[a.myAddr] = ring.InstanceDesc{ + Addr: a.myAddr, + Zone: "", + State: ring.ACTIVE, + Timestamp: time.Now().Unix(), + RegisteredTimestamp: time.Now().Add(-2 * time.Hour).Unix(), + Tokens: []uint32{uint32((math.MaxUint32 / numAM) * i)}, + } + amByAddr[a.myAddr] = ams[i] + } + + kvStore := consul.NewInMemoryClient(ring.GetCodec()) + err := kvStore.CAS(context.Background(), RingKey, + func(_ interface{}) (interface{}, bool, error) { + return &ring.Desc{ + Ingesters: amDescs, + }, true, nil + }, + ) + require.NoError(t, err) + + amRing, err := ring.New(ring.Config{ + KVStore: kv.Config{ + Mock: kvStore, + }, + HeartbeatTimeout: 60 * time.Minute, + ReplicationFactor: replicationFactor, + }, RingNameForServer, RingKey, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), amRing)) + test.Poll(t, time.Second, numAM, func() interface{} { + return amRing.InstancesCount() + }) + + cfg := &MultitenantAlertmanagerConfig{} + flagext.DefaultValues(cfg) + + d, err := NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, amRing, newMockAlertmanagerClientFactory(amByAddr), util_log.Logger, prometheus.NewRegistry()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) + + return d, ams, func() { + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), d)) + } +} + +type mockAlertmanager struct { + alertmanagerpb.AlertmanagerClient + grpc_health_v1.HealthClient + // receivedRequests is map of route -> statusCode -> number of requests. + receivedRequests map[string]map[int]int + mtx sync.Mutex + myAddr string + happy bool +} + +func newMockAlertmanager(idx int, happy bool) *mockAlertmanager { + return &mockAlertmanager{ + receivedRequests: make(map[string]map[int]int), + myAddr: fmt.Sprintf("127.0.0.1:%05d", 10000+idx), + happy: happy, + } +} + +func (am *mockAlertmanager) HandleRequest(_ context.Context, in *httpgrpc.HTTPRequest, _ ...grpc.CallOption) (*httpgrpc.HTTPResponse, error) { + am.mtx.Lock() + defer am.mtx.Unlock() + + u, err := url.Parse(in.Url) + if err != nil { + return nil, err + } + path := u.Path + m, ok := am.receivedRequests[path] + if !ok { + m = make(map[int]int) + am.receivedRequests[path] = m + } + + if am.happy { + m[http.StatusOK]++ + return &httpgrpc.HTTPResponse{ + Code: http.StatusOK, + Headers: []*httpgrpc.Header{ + { + Key: "Content-Type", + Values: []string{"it-is-ok"}, + }, { + Key: "X-Content-Type-Options", + Values: []string{"ok-option-1", "ok-option-2"}, + }, + }, + }, nil + } + + m[http.StatusInternalServerError]++ + return nil, httpgrpc.ErrorFromHTTPResponse(&httpgrpc.HTTPResponse{ + Code: http.StatusInternalServerError, + Headers: []*httpgrpc.Header{ + { + Key: "Content-Type", + Values: []string{"it-is-not-ok"}, + }, { + Key: "X-Content-Type-Options", + Values: []string{"not-ok-option-1", "not-ok-option-2"}, + }, + }, + }) +} + +func (am *mockAlertmanager) requestsCount(route string) int { + am.mtx.Lock() + defer am.mtx.Unlock() + + routeMap, ok := am.receivedRequests[route] + if !ok { + return 0 + } + + // The status could be something other than overall + // expected status because of quorum logic. + reqCount := 0 + for _, count := range routeMap { + reqCount += count + } + return reqCount +} + +func (am *mockAlertmanager) Close() error { + return nil +} + +func (am *mockAlertmanager) RemoteAddress() string { + return am.myAddr +} + +type mockAlertmanagerClientFactory struct { + alertmanagerByAddr map[string]*mockAlertmanager +} + +func newMockAlertmanagerClientFactory(alertmanagerByAddr map[string]*mockAlertmanager) ClientsPool { + return &mockAlertmanagerClientFactory{alertmanagerByAddr: alertmanagerByAddr} +} + +func (f *mockAlertmanagerClientFactory) GetClientFor(addr string) (Client, error) { + c, ok := f.alertmanagerByAddr[addr] + if !ok { + return nil, errors.New("client not found") + } + return Client(c), nil +} diff --git a/pkg/alertmanager/multitenant.go b/pkg/alertmanager/multitenant.go index d74e5edfcc..27eea5417a 100644 --- a/pkg/alertmanager/multitenant.go +++ b/pkg/alertmanager/multitenant.go @@ -21,6 +21,8 @@ import ( amconfig "github.com/prometheus/alertmanager/config" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/weaveworks/common/httpgrpc" + "github.com/weaveworks/common/httpgrpc/server" "github.com/cortexproject/cortex/pkg/alertmanager/alerts" "github.com/cortexproject/cortex/pkg/ring" @@ -91,10 +93,11 @@ func init() { // MultitenantAlertmanagerConfig is the configuration for a multitenant Alertmanager. type MultitenantAlertmanagerConfig struct { - DataDir string `yaml:"data_dir"` - Retention time.Duration `yaml:"retention"` - ExternalURL flagext.URLValue `yaml:"external_url"` - PollInterval time.Duration `yaml:"poll_interval"` + DataDir string `yaml:"data_dir"` + Retention time.Duration `yaml:"retention"` + ExternalURL flagext.URLValue `yaml:"external_url"` + PollInterval time.Duration `yaml:"poll_interval"` + MaxRecvMsgSize int64 `yaml:"max_recv_msg_size"` DeprecatedClusterBindAddr string `yaml:"cluster_bind_address"` DeprecatedClusterAdvertiseAddr string `yaml:"cluster_advertise_address"` @@ -112,6 +115,9 @@ type MultitenantAlertmanagerConfig struct { Cluster ClusterConfig `yaml:"cluster"` EnableAPI bool `yaml:"enable_api"` + + // For distributor. + AlertmanagerClient ClientConfig `yaml:"alertmanager_client"` } type ClusterConfig struct { @@ -132,6 +138,7 @@ const ( func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.DataDir, "alertmanager.storage.path", "data/", "Base path for data storage.") f.DurationVar(&cfg.Retention, "alertmanager.storage.retention", 5*24*time.Hour, "How long to keep data for.") + f.Int64Var(&cfg.MaxRecvMsgSize, "alertmanager.max-recv-msg-size", 16<<20, "Maximum size (bytes) of an accepted HTTP request body.") f.Var(&cfg.ExternalURL, "alertmanager.web.external-url", "The URL under which Alertmanager is externally reachable (for example, if Alertmanager is served via a reverse proxy). Used for generating relative and absolute links back to Alertmanager itself. If the URL has a path portion, it will be used to prefix all HTTP endpoints served by Alertmanager. If omitted, relevant URL components will be derived automatically.") @@ -150,6 +157,8 @@ func (cfg *MultitenantAlertmanagerConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ShardingEnabled, "alertmanager.sharding-enabled", false, "Shard tenants across multiple alertmanager instances.") + cfg.AlertmanagerClient.RegisterFlagsWithPrefix("alertmanager.alertmanager-client", f) + cfg.ShardingRing.RegisterFlags(f) cfg.Store.RegisterFlags(f) cfg.Cluster.RegisterFlags(f) @@ -231,8 +240,15 @@ type MultitenantAlertmanager struct { cfg *MultitenantAlertmanagerConfig // Ring used for sharding alertmanager instances. + // When sharding is disabled, the flow is: + // ServeHTTP() -> serveRequest() + // When sharding is enabled: + // ServeHTTP() -> distributor.DistributeRequest() -> (sends to other AM or even the current) + // -> HandleRequest() (gRPC call) -> grpcServer() -> handlerForGRPCServer.ServeHTTP() -> serveRequest(). ringLifecycler *ring.BasicLifecycler ring *ring.Ring + distributor *Distributor + grpcServer *server.Server // Subservices manager (ring, lifecycler) subservices *services.Manager @@ -400,6 +416,13 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC if am.registry != nil { am.registry.MustRegister(am.ring) } + + am.grpcServer = server.NewServer(&handlerForGRPCServer{am: am}) + + am.distributor, err = NewDistributor(cfg.AlertmanagerClient, cfg.MaxRecvMsgSize, am.ring, nil, log.With(logger, "component", "AlertmanagerDistributor"), am.registry) + if err != nil { + return nil, errors.Wrap(err, "create distributor") + } } if registerer != nil { @@ -411,6 +434,16 @@ func createMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, fallbackC return am, nil } +// handlerForGRPCServer acts as a handler for gRPC server to serve +// the serveRequest() via the standard ServeHTTP. +type handlerForGRPCServer struct { + am *MultitenantAlertmanager +} + +func (h *handlerForGRPCServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { + h.am.serveRequest(w, req) +} + func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) { defer func() { if err == nil || am.subservices == nil { @@ -423,7 +456,7 @@ func (am *MultitenantAlertmanager) starting(ctx context.Context) (err error) { }() if am.cfg.ShardingEnabled { - if am.subservices, err = services.NewManager(am.ringLifecycler, am.ring); err != nil { + if am.subservices, err = services.NewManager(am.ringLifecycler, am.ring, am.distributor); err != nil { return errors.Wrap(err, "failed to start alertmanager's subservices") } @@ -738,6 +771,23 @@ func (am *MultitenantAlertmanager) ServeHTTP(w http.ResponseWriter, req *http.Re return } + if am.cfg.ShardingEnabled && am.distributor.IsPathSupported(req.URL.Path) { + am.distributor.DistributeRequest(w, req) + return + } + + // If sharding is not enabled or Distributor does not support this path, + // it is served by this instance. + am.serveRequest(w, req) +} + +// HandleRequest implements gRPC Alertmanager service, which receives request from AlertManager-Distributor. +func (am *MultitenantAlertmanager) HandleRequest(ctx context.Context, in *httpgrpc.HTTPRequest) (*httpgrpc.HTTPResponse, error) { + return am.grpcServer.Handle(ctx, in) +} + +// serveRequest serves the Alertmanager's web UI and API. +func (am *MultitenantAlertmanager) serveRequest(w http.ResponseWriter, req *http.Request) { userID, err := tenant.TenantID(req.Context()) if err != nil { http.Error(w, err.Error(), http.StatusUnauthorized) @@ -748,7 +798,6 @@ func (am *MultitenantAlertmanager) ServeHTTP(w http.ResponseWriter, req *http.Re am.alertmanagersMtx.Unlock() if ok { - userAM.mux.ServeHTTP(w, req) return } diff --git a/pkg/api/api.go b/pkg/api/api.go index 23f81049ac..7c797a24dd 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -17,6 +17,7 @@ import ( "github.com/weaveworks/common/server" "github.com/cortexproject/cortex/pkg/alertmanager" + "github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb" "github.com/cortexproject/cortex/pkg/chunk/purger" "github.com/cortexproject/cortex/pkg/compactor" "github.com/cortexproject/cortex/pkg/distributor" @@ -158,7 +159,9 @@ func (a *API) RegisterRoutesWithPrefix(prefix string, handler http.Handler, auth // RegisterAlertmanager registers endpoints associated with the alertmanager. It will only // serve endpoints using the legacy http-prefix if it is not run as a single binary. -func (a *API) RegisterAlertmanager(am *alertmanager.MultitenantAlertmanager, target, apiEnabled bool) { +func (a *API) RegisterAlertmanager(am *alertmanager.MultitenantAlertmanager, amCfg alertmanager.MultitenantAlertmanagerConfig, target, apiEnabled bool) { + alertmanagerpb.RegisterAlertmanagerServer(a.server.GRPC, am) + a.indexPage.AddLink(SectionAdminEndpoints, "/multitenant_alertmanager/status", "Alertmanager Status") a.indexPage.AddLink(SectionAdminEndpoints, "/multitenant_alertmanager/ring", "Alertmanager Ring Status") // Ensure this route is registered before the prefixed AM route diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 92dd351fe5..a85ed37144 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -698,14 +698,14 @@ func (t *Cortex) initConfig() (serv services.Service, err error) { } func (t *Cortex) initAlertManager() (serv services.Service, err error) { - t.Cfg.Alertmanager.ShardingRing.ListenPort = t.Cfg.Server.HTTPListenPort + t.Cfg.Alertmanager.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort t.Alertmanager, err = alertmanager.NewMultitenantAlertmanager(&t.Cfg.Alertmanager, util_log.Logger, prometheus.DefaultRegisterer) if err != nil { return } - t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.isModuleEnabled(AlertManager), t.Cfg.Alertmanager.EnableAPI) + t.API.RegisterAlertmanager(t.Alertmanager, t.Cfg.Alertmanager, t.Cfg.isModuleEnabled(AlertManager), t.Cfg.Alertmanager.EnableAPI) return t.Alertmanager, nil } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 0ae008c125..a1f1bc42b4 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -153,7 +153,7 @@ type Distributor struct { activeUsers *util.ActiveUsers } -// Config contains the configuration require to +// Config contains the configuration required to // create a Distributor type Config struct { PoolConfig PoolConfig `yaml:"pool"` diff --git a/pkg/frontend/transport/handler.go b/pkg/frontend/transport/handler.go index 2b10065bca..7660c1e9cb 100644 --- a/pkg/frontend/transport/handler.go +++ b/pkg/frontend/transport/handler.go @@ -22,6 +22,7 @@ import ( querier_stats "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" ) @@ -201,7 +202,7 @@ func writeError(w http.ResponseWriter, err error) { case context.DeadlineExceeded: err = errDeadlineExceeded default: - if strings.Contains(err.Error(), "http: request body too large") { + if util.IsRequestBodyTooLarge(err) { err = errRequestEntityTooLarge } } diff --git a/pkg/util/http.go b/pkg/util/http.go index f06363e537..5e0f176391 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -20,6 +20,11 @@ import ( const messageSizeLargerErrFmt = "received message larger than max (%d vs %d)" +// IsRequestBodyTooLarge returns true if the error is "http: request body too large". +func IsRequestBodyTooLarge(err error) bool { + return err != nil && strings.Contains(err.Error(), "http: request body too large") +} + // BasicAuth configures basic authentication for HTTP clients. type BasicAuth struct { Username string `yaml:"basic_auth_username"` diff --git a/pkg/util/http_test.go b/pkg/util/http_test.go index d2430ec6c9..26e445990c 100644 --- a/pkg/util/http_test.go +++ b/pkg/util/http_test.go @@ -4,6 +4,8 @@ import ( "bytes" "context" "html/template" + "io/ioutil" + "net/http" "net/http/httptest" "testing" @@ -157,3 +159,8 @@ func (b bytesBuffered) Close() error { func (b bytesBuffered) BytesBuffer() *bytes.Buffer { return b.Buffer } + +func TestIsRequestBodyTooLargeRegression(t *testing.T) { + _, err := ioutil.ReadAll(http.MaxBytesReader(httptest.NewRecorder(), ioutil.NopCloser(bytes.NewReader([]byte{1, 2, 3, 4})), 1)) + assert.True(t, util.IsRequestBodyTooLarge(err)) +}