From f5bb9548aafdac95ec0e1e6b85ab8c6983108526 Mon Sep 17 00:00:00 2001 From: Ruben Vargas Date: Thu, 9 Sep 2021 03:52:23 -0500 Subject: [PATCH] Migrate elasticsearch rollover to go (#3242) * Index rollover golang migration, init action Signed-off-by: Ruben Vargas * Add rollover action to es-rollover cmd Signed-off-by: Ruben Vargas * Add lookback command Signed-off-by: Ruben Vargas * Fix usage of commands, add host positional arg Signed-off-by: Ruben Vargas * Rename some variables, move some fuctions to pkg/es for reuse it Signed-off-by: Ruben Vargas * Reuse code, move commands to main Signed-off-by: Ruben Vargas * Move filters to pkg/es Signed-off-by: Ruben Vargas * Simplified the es index client code Signed-off-by: Ruben Vargas * Initialie subcommand configs Signed-off-by: Ruben Vargas * Add comments to exported methods and types Signed-off-by: Ruben Vargas * Remove spaces, rename some functions to align it more with the functionality Signed-off-by: Ruben Vargas * Change how to init and handle global variables Signed-off-by: Ruben Vargas * Fix default unit for lookback command Signed-off-by: Ruben Vargas * Validate positional argument Signed-off-by: Ruben Vargas * Separate ILM into his own client api Signed-off-by: Ruben Vargas * Fix index client delete test Signed-off-by: Ruben Vargas * Add index client tests Signed-off-by: Ruben Vargas * Add flags tests Signed-off-by: Ruben Vargas * Index filter tests Signed-off-by: Ruben Vargas * Add index option tests Signed-off-by: Ruben Vargas * fix filter tests Signed-off-by: Ruben Vargas * Actions unit tests Signed-off-by: Ruben Vargas * test rollover action Signed-off-by: Ruben Vargas * Init action tests Signed-off-by: Ruben Vargas * Add lookback tests Signed-off-by: Ruben Vargas * Minor code style changes Signed-off-by: Ruben Vargas * remove empty_test.go from mocks package Signed-off-by: Ruben Vargas * Bind correct root flags Signed-off-by: Ruben Vargas * Return response errors on index client methods if its possible Signed-off-by: Ruben Vargas * Fix some code styles, extra spaces etc.. Signed-off-by: Ruben Vargas * Rename addRootFlags function Signed-off-by: Ruben Vargas --- cmd/es-index-cleaner/app/index_client.go | 144 ----- cmd/es-index-cleaner/app/index_client_test.go | 214 -------- cmd/es-index-cleaner/app/index_filter.go | 21 +- cmd/es-index-cleaner/app/index_filter_test.go | 20 +- cmd/es-index-cleaner/main.go | 11 +- cmd/es-rollover/app/actions.go | 74 +++ cmd/es-rollover/app/actions_test.go | 124 +++++ cmd/es-rollover/app/flags.go | 65 +++ cmd/es-rollover/app/flags_test.go | 54 ++ cmd/es-rollover/app/index_options.go | 67 +++ cmd/es-rollover/app/index_options_test.go | 118 +++++ cmd/es-rollover/app/init/action.go | 156 ++++++ cmd/es-rollover/app/init/action_test.go | 251 +++++++++ cmd/es-rollover/app/init/flags.go | 47 ++ cmd/es-rollover/app/init/flags_test.go | 45 ++ cmd/es-rollover/app/lookback/action.go | 70 +++ cmd/es-rollover/app/lookback/action_test.go | 152 ++++++ cmd/es-rollover/app/lookback/flags.go | 49 ++ cmd/es-rollover/app/lookback/flags_test.go | 45 ++ .../app/lookback/time_reference.go | 41 ++ .../app/lookback/time_reference_test.go | 83 +++ cmd/es-rollover/app/rollover/action.go | 71 +++ cmd/es-rollover/app/rollover/action_test.go | 112 ++++ cmd/es-rollover/app/rollover/flags.go | 44 ++ cmd/es-rollover/app/rollover/flags_test.go | 43 ++ cmd/es-rollover/main.go | 167 ++++++ pkg/es/client/basic_auth.go | 25 + pkg/es/client/basic_auth_test.go | 50 ++ pkg/es/client/client.go | 118 +++++ pkg/es/client/cluster_client.go | 65 +++ pkg/es/client/cluster_client_test.go | 204 ++++++++ pkg/es/client/ilm_client.go | 47 ++ pkg/es/client/ilm_client_test.go | 78 +++ pkg/es/client/index_client.go | 255 +++++++++ pkg/es/client/index_client_test.go | 495 ++++++++++++++++++ pkg/es/client/interfaces.go | 33 ++ pkg/es/client/mocks/cluter_client.go | 28 + pkg/es/client/mocks/ilm_client.go | 26 + pkg/es/client/mocks/index_client.go | 54 ++ pkg/es/filter/alias.go | 70 +++ pkg/es/filter/alias_test.go | 116 ++++ pkg/es/filter/date.go | 32 ++ pkg/es/filter/date_test.go | 93 ++++ 43 files changed, 3692 insertions(+), 385 deletions(-) delete mode 100644 cmd/es-index-cleaner/app/index_client.go delete mode 100644 cmd/es-index-cleaner/app/index_client_test.go create mode 100644 cmd/es-rollover/app/actions.go create mode 100644 cmd/es-rollover/app/actions_test.go create mode 100644 cmd/es-rollover/app/flags.go create mode 100644 cmd/es-rollover/app/flags_test.go create mode 100644 cmd/es-rollover/app/index_options.go create mode 100644 cmd/es-rollover/app/index_options_test.go create mode 100644 cmd/es-rollover/app/init/action.go create mode 100644 cmd/es-rollover/app/init/action_test.go create mode 100644 cmd/es-rollover/app/init/flags.go create mode 100644 cmd/es-rollover/app/init/flags_test.go create mode 100644 cmd/es-rollover/app/lookback/action.go create mode 100644 cmd/es-rollover/app/lookback/action_test.go create mode 100644 cmd/es-rollover/app/lookback/flags.go create mode 100644 cmd/es-rollover/app/lookback/flags_test.go create mode 100644 cmd/es-rollover/app/lookback/time_reference.go create mode 100644 cmd/es-rollover/app/lookback/time_reference_test.go create mode 100644 cmd/es-rollover/app/rollover/action.go create mode 100644 cmd/es-rollover/app/rollover/action_test.go create mode 100644 cmd/es-rollover/app/rollover/flags.go create mode 100644 cmd/es-rollover/app/rollover/flags_test.go create mode 100644 cmd/es-rollover/main.go create mode 100644 pkg/es/client/basic_auth.go create mode 100644 pkg/es/client/basic_auth_test.go create mode 100644 pkg/es/client/client.go create mode 100644 pkg/es/client/cluster_client.go create mode 100644 pkg/es/client/cluster_client_test.go create mode 100644 pkg/es/client/ilm_client.go create mode 100644 pkg/es/client/ilm_client_test.go create mode 100644 pkg/es/client/index_client.go create mode 100644 pkg/es/client/index_client_test.go create mode 100644 pkg/es/client/interfaces.go create mode 100644 pkg/es/client/mocks/cluter_client.go create mode 100644 pkg/es/client/mocks/ilm_client.go create mode 100644 pkg/es/client/mocks/index_client.go create mode 100644 pkg/es/filter/alias.go create mode 100644 pkg/es/filter/alias_test.go create mode 100644 pkg/es/filter/date.go create mode 100644 pkg/es/filter/date_test.go diff --git a/cmd/es-index-cleaner/app/index_client.go b/cmd/es-index-cleaner/app/index_client.go deleted file mode 100644 index 3a523bc0243..00000000000 --- a/cmd/es-index-cleaner/app/index_client.go +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright (c) 2021 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package app - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "net/http" - "strconv" - "time" -) - -// Index represents ES index. -type Index struct { - // Index name. - Index string - // Index creation time. - CreationTime time.Time - // Aliases - Aliases map[string]bool -} - -// IndicesClient is a client used to manipulate indices. -type IndicesClient struct { - // Http client. - Client *http.Client - // ES server endpoint. - Endpoint string - // ES master_timeout parameter. - MasterTimeoutSeconds int - BasicAuth string -} - -// GetJaegerIndices queries all Jaeger indices including the archive and rollover. -// Jaeger daily indices are: -// jaeger-span-2019-01-01, jaeger-service-2019-01-01, jaeger-dependencies-2019-01-01 -// jaeger-span-archive -// Rollover indices: -// aliases: jaeger-span-read, jaeger-span-write, jaeger-service-read, jaeger-service-write -// indices: jaeger-span-000001, jaeger-service-000001 etc. -// aliases: jaeger-span-archive-read, jaeger-span-archive-write -// indices: jaeger-span-archive-000001 -func (i *IndicesClient) GetJaegerIndices(prefix string) ([]Index, error) { - prefix += "jaeger-*" - r, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s?flat_settings=true&filter_path=*.aliases,*.settings", i.Endpoint, prefix), nil) - if err != nil { - return nil, err - } - i.setAuthorization(r) - res, err := i.Client.Do(r) - if err != nil { - return nil, fmt.Errorf("failed to query indices: %w", err) - } - - if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("failed to query indices: %w", handleFailedRequest(res)) - } - - body, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil, fmt.Errorf("failed to query indices and read response body: %w", err) - } - - type indexInfo struct { - Aliases map[string]interface{} `json:"aliases"` - Settings map[string]string `json:"settings"` - } - var indicesInfo map[string]indexInfo - if err = json.Unmarshal(body, &indicesInfo); err != nil { - return nil, fmt.Errorf("failed to query indices and unmarshall response body: %q: %w", body, err) - } - - var indices []Index - for k, v := range indicesInfo { - aliases := map[string]bool{} - for alias := range v.Aliases { - aliases[alias] = true - } - // ignoring error, ES should return valid date - creationDate, _ := strconv.ParseInt(v.Settings["index.creation_date"], 10, 64) - - indices = append(indices, Index{ - Index: k, - CreationTime: time.Unix(0, int64(time.Millisecond)*creationDate), - Aliases: aliases, - }) - } - return indices, nil -} - -// DeleteIndices deletes specified set of indices. -func (i *IndicesClient) DeleteIndices(indices []Index) error { - concatIndices := "" - for _, i := range indices { - concatIndices += i.Index - concatIndices += "," - } - - r, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/%s?master_timeout=%ds", i.Endpoint, concatIndices, i.MasterTimeoutSeconds), nil) - if err != nil { - return err - } - i.setAuthorization(r) - - res, err := i.Client.Do(r) - if err != nil { - return fmt.Errorf("failed to delete indices: %w", err) - } - if res.StatusCode != http.StatusOK { - return fmt.Errorf("failed to delete indices: %s, %w", concatIndices, handleFailedRequest(res)) - } - return nil -} - -func handleFailedRequest(res *http.Response) error { - var body string - if res.Body != nil { - bodyBytes, err := ioutil.ReadAll(res.Body) - if err != nil { - return fmt.Errorf("request failed and failed to read response body, status code: %d, %w", res.StatusCode, err) - } - body = string(bodyBytes) - } - return fmt.Errorf("request failed, status code: %d, body: %s", res.StatusCode, body) -} - -func (i *IndicesClient) setAuthorization(r *http.Request) { - if i.BasicAuth != "" { - r.Header.Add("Authorization", fmt.Sprintf("Basic %s", i.BasicAuth)) - } -} diff --git a/cmd/es-index-cleaner/app/index_client_test.go b/cmd/es-index-cleaner/app/index_client_test.go deleted file mode 100644 index 7c7db3829ba..00000000000 --- a/cmd/es-index-cleaner/app/index_client_test.go +++ /dev/null @@ -1,214 +0,0 @@ -// Copyright (c) 2021 The Jaeger Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package app - -import ( - "net/http" - "net/http/httptest" - "sort" - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -const esIndexResponse = ` -{ - "jaeger-service-2021-08-06" : { - "aliases" : { }, - "settings" : { - "index.creation_date" : "1628259381266", - "index.mapper.dynamic" : "false", - "index.mapping.nested_fields.limit" : "50", - "index.number_of_replicas" : "1", - "index.number_of_shards" : "5", - "index.provided_name" : "jaeger-service-2021-08-06", - "index.requests.cache.enable" : "true", - "index.uuid" : "2kKdvrvAT7qXetRzmWhjYQ", - "index.version.created" : "5061099" - } - }, - "jaeger-span-2021-08-06" : { - "aliases" : { }, - "settings" : { - "index.creation_date" : "1628259381326", - "index.mapper.dynamic" : "false", - "index.mapping.nested_fields.limit" : "50", - "index.number_of_replicas" : "1", - "index.number_of_shards" : "5", - "index.provided_name" : "jaeger-span-2021-08-06", - "index.requests.cache.enable" : "true", - "index.uuid" : "zySRY_FfRFa5YMWxNsNViA", - "index.version.created" : "5061099" - } - }, - "jaeger-span-000001" : { - "aliases" : { - "jaeger-span-read" : { }, - "jaeger-span-write" : { } - }, - "settings" : { - "index.creation_date" : "1628259381326" - } - } -}` - -const esErrResponse = `{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"request [/jaeger-*] contains unrecognized parameter: [help]"}],"type":"illegal_argument_exception","reason":"request [/jaeger-*] contains unrecognized parameter: [help]"},"status":400}` - -func TestClientGetIndices(t *testing.T) { - tests := []struct { - name string - responseCode int - response string - errContains string - indices []Index - }{ - { - name: "no error", - responseCode: http.StatusOK, - response: esIndexResponse, - indices: []Index{ - { - Index: "jaeger-service-2021-08-06", - CreationTime: time.Unix(0, int64(time.Millisecond)*1628259381266), - Aliases: map[string]bool{}, - }, - { - Index: "jaeger-span-000001", - CreationTime: time.Unix(0, int64(time.Millisecond)*1628259381326), - Aliases: map[string]bool{"jaeger-span-read": true, "jaeger-span-write": true}, - }, - { - Index: "jaeger-span-2021-08-06", - CreationTime: time.Unix(0, int64(time.Millisecond)*1628259381326), - Aliases: map[string]bool{}, - }, - }, - }, - { - name: "client error", - responseCode: http.StatusBadRequest, - response: esErrResponse, - errContains: "failed to query indices: request failed, status code: 400", - }, - { - name: "unmarshall error", - responseCode: http.StatusOK, - response: "AAA", - errContains: `failed to query indices and unmarshall response body: "AAA"`, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - res.WriteHeader(test.responseCode) - res.Write([]byte(test.response)) - })) - defer testServer.Close() - - c := &IndicesClient{ - Client: testServer.Client(), - Endpoint: testServer.URL, - } - - indices, err := c.GetJaegerIndices("") - if test.errContains != "" { - require.Error(t, err) - assert.Contains(t, err.Error(), test.errContains) - assert.Nil(t, indices) - } else { - require.NoError(t, err) - sort.Slice(indices, func(i, j int) bool { - return strings.Compare(indices[i].Index, indices[j].Index) < 0 - }) - assert.Equal(t, test.indices, indices) - } - }) - } -} - -func TestClientDeleteIndices(t *testing.T) { - tests := []struct { - name string - responseCode int - response string - errContains string - }{ - { - name: "no error", - responseCode: http.StatusOK, - }, - { - name: "client error", - responseCode: http.StatusBadRequest, - response: esErrResponse, - errContains: "ailed to delete indices: jaeger-span", - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - - testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { - assert.True(t, strings.Contains(req.URL.String(), "jaeger-span")) - assert.Equal(t, "Basic foobar", req.Header.Get("Authorization")) - res.WriteHeader(test.responseCode) - res.Write([]byte(test.response)) - })) - defer testServer.Close() - - c := &IndicesClient{ - Client: testServer.Client(), - Endpoint: testServer.URL, - BasicAuth: "foobar", - } - - err := c.DeleteIndices([]Index{ - { - Index: "jaeger-span", - }, - }) - - if test.errContains != "" { - require.Error(t, err) - assert.Contains(t, err.Error(), test.errContains) - } - }) - } -} - -func TestClientRequestError(t *testing.T) { - c := &IndicesClient{ - Endpoint: "%", - } - err := c.DeleteIndices([]Index{}) - require.Error(t, err) - indices, err := c.GetJaegerIndices("") - require.Error(t, err) - assert.Nil(t, indices) -} - -func TestClientDoError(t *testing.T) { - c := &IndicesClient{ - Endpoint: "localhost:1", - Client: &http.Client{}, - } - err := c.DeleteIndices([]Index{}) - require.Error(t, err) - indices, err := c.GetJaegerIndices("") - require.Error(t, err) - assert.Nil(t, indices) -} diff --git a/cmd/es-index-cleaner/app/index_filter.go b/cmd/es-index-cleaner/app/index_filter.go index a01582730d6..e7695984a6e 100644 --- a/cmd/es-index-cleaner/app/index_filter.go +++ b/cmd/es-index-cleaner/app/index_filter.go @@ -18,6 +18,9 @@ import ( "fmt" "regexp" "time" + + "github.com/jaegertracing/jaeger/pkg/es/client" + "github.com/jaegertracing/jaeger/pkg/es/filter" ) // IndexFilter holds configuration for index filtering. @@ -35,12 +38,12 @@ type IndexFilter struct { } // Filter filters indices. -func (i *IndexFilter) Filter(indices []Index) []Index { +func (i *IndexFilter) Filter(indices []client.Index) []client.Index { indices = i.filter(indices) - return i.filterByDate(indices) + return filter.ByDate(indices, i.DeleteBeforeThisDate) } -func (i *IndexFilter) filter(indices []Index) []Index { +func (i *IndexFilter) filter(indices []client.Index) []client.Index { var reg *regexp.Regexp if i.Archive { // archive works only for rollover @@ -51,7 +54,7 @@ func (i *IndexFilter) filter(indices []Index) []Index { reg, _ = regexp.Compile(fmt.Sprintf("^%sjaeger-(span|service|dependencies)-\\d{4}%s\\d{2}%s\\d{2}", i.IndexPrefix, i.IndexDateSeparator, i.IndexDateSeparator)) } - var filtered []Index + var filtered []client.Index for _, in := range indices { if reg.MatchString(in.Index) { // index in write alias cannot be removed @@ -63,13 +66,3 @@ func (i *IndexFilter) filter(indices []Index) []Index { } return filtered } - -func (i *IndexFilter) filterByDate(indices []Index) []Index { - var filtered []Index - for _, in := range indices { - if in.CreationTime.Before(i.DeleteBeforeThisDate) { - filtered = append(filtered, in) - } - } - return filtered -} diff --git a/cmd/es-index-cleaner/app/index_filter_test.go b/cmd/es-index-cleaner/app/index_filter_test.go index 0f2a58920f5..4accb3ab092 100644 --- a/cmd/es-index-cleaner/app/index_filter_test.go +++ b/cmd/es-index-cleaner/app/index_filter_test.go @@ -19,6 +19,8 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/pkg/es/client" ) func TestIndexFilter(t *testing.T) { @@ -31,7 +33,7 @@ func TestIndexFilter_prefix(t *testing.T) { func testIndexFilter(t *testing.T, prefix string) { time20200807 := time.Date(2020, time.August, 06, 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1) - indices := []Index{ + indices := []client.Index{ { Index: prefix + "jaeger-span-2020-08-06", CreationTime: time.Date(2020, time.August, 06, 15, 0, 0, 0, time.UTC), @@ -147,7 +149,7 @@ func testIndexFilter(t *testing.T, prefix string) { tests := []struct { name string filter *IndexFilter - expected []Index + expected []client.Index }{ { name: "normal indices, remove older than 2 days", @@ -168,7 +170,7 @@ func testIndexFilter(t *testing.T, prefix string) { Rollover: false, DeleteBeforeThisDate: time20200807.Add(-time.Hour * 24 * time.Duration(1)), }, - expected: []Index{ + expected: []client.Index{ { Index: prefix + "jaeger-span-2020-08-05", CreationTime: time.Date(2020, time.August, 05, 15, 0, 0, 0, time.UTC), @@ -195,7 +197,7 @@ func testIndexFilter(t *testing.T, prefix string) { Rollover: false, DeleteBeforeThisDate: time20200807.Add(-time.Hour * 24 * time.Duration(0)), }, - expected: []Index{ + expected: []client.Index{ { Index: prefix + "jaeger-span-2020-08-06", CreationTime: time.Date(2020, time.August, 06, 15, 0, 0, 0, time.UTC), @@ -237,7 +239,7 @@ func testIndexFilter(t *testing.T, prefix string) { Rollover: false, DeleteBeforeThisDate: time20200807.Add(-time.Hour * 24 * time.Duration(1)), }, - expected: []Index{ + expected: []client.Index{ { Index: prefix + "jaeger-span-archive-000001", CreationTime: time.Date(2020, time.August, 5, 15, 0, 0, 0, time.UTC), @@ -256,7 +258,7 @@ func testIndexFilter(t *testing.T, prefix string) { Rollover: true, DeleteBeforeThisDate: time20200807.Add(-time.Hour * 24 * time.Duration(1)), }, - expected: []Index{ + expected: []client.Index{ { Index: prefix + "jaeger-span-000001", CreationTime: time.Date(2020, time.August, 05, 15, 0, 0, 0, time.UTC), @@ -282,7 +284,7 @@ func testIndexFilter(t *testing.T, prefix string) { Rollover: true, DeleteBeforeThisDate: time20200807.Add(-time.Hour * 24 * time.Duration(0)), }, - expected: []Index{ + expected: []client.Index{ { Index: prefix + "jaeger-span-000001", CreationTime: time.Date(2020, time.August, 05, 15, 0, 0, 0, time.UTC), @@ -308,7 +310,7 @@ func testIndexFilter(t *testing.T, prefix string) { Rollover: true, DeleteBeforeThisDate: time20200807.Add(-time.Hour * 24 * time.Duration(1)), }, - expected: []Index{ + expected: []client.Index{ { Index: prefix + "jaeger-span-archive-000001", CreationTime: time.Date(2020, time.August, 05, 15, 0, 0, 0, time.UTC), @@ -327,7 +329,7 @@ func testIndexFilter(t *testing.T, prefix string) { Rollover: true, DeleteBeforeThisDate: time20200807.Add(-time.Hour * 24 * time.Duration(0)), }, - expected: []Index{ + expected: []client.Index{ { Index: prefix + "jaeger-span-archive-000001", CreationTime: time.Date(2020, time.August, 05, 15, 0, 0, 0, time.UTC), diff --git a/cmd/es-index-cleaner/main.go b/cmd/es-index-cleaner/main.go index e38f15330e8..3e24b14dbb4 100644 --- a/cmd/es-index-cleaner/main.go +++ b/cmd/es-index-cleaner/main.go @@ -29,6 +29,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/es-index-cleaner/app" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/es/client" ) func main() { @@ -65,11 +66,13 @@ func main() { TLSClientConfig: tlsCfg, }, } - i := app.IndicesClient{ - Client: c, - Endpoint: args[1], + i := client.IndicesClient{ + Client: client.Client{ + Endpoint: args[1], + Client: c, + BasicAuth: basicAuth(cfg.Username, cfg.Password), + }, MasterTimeoutSeconds: cfg.MasterNodeTimeoutSeconds, - BasicAuth: basicAuth(cfg.Username, cfg.Password), } indices, err := i.GetJaegerIndices(cfg.IndexPrefix) diff --git a/cmd/es-rollover/app/actions.go b/cmd/es-rollover/app/actions.go new file mode 100644 index 00000000000..2186bd68884 --- /dev/null +++ b/cmd/es-rollover/app/actions.go @@ -0,0 +1,74 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "crypto/tls" + "net/http" + "time" + + "github.com/spf13/viper" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/es/client" +) + +func newESClient(endpoint string, cfg *Config, tlsCfg *tls.Config) client.Client { + httpClient := &http.Client{ + Timeout: time.Duration(cfg.Timeout) * time.Second, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: tlsCfg, + }, + } + return client.Client{ + Endpoint: endpoint, + Client: httpClient, + BasicAuth: client.BasicAuth(cfg.Username, cfg.Password), + } +} + +// Action is an interface that each action (init, rollover and lookback) of the es-rollover should implement +type Action interface { + Do() error +} + +// ActionExecuteOptions are the options passed to the execute action function +type ActionExecuteOptions struct { + Args []string + Viper *viper.Viper + Logger *zap.Logger + TLSFlags tlscfg.ClientFlagsConfig +} + +// ActionCreatorFunction type is the function type in charge of create the action to be executed +type ActionCreatorFunction func(client.Client, Config) Action + +// ExecuteAction execute the action returned by the createAction function +func ExecuteAction(opts ActionExecuteOptions, createAction ActionCreatorFunction) error { + cfg := Config{} + cfg.InitFromViper(opts.Viper) + tlsOpts := opts.TLSFlags.InitFromViper(opts.Viper) + tlsCfg, err := tlsOpts.Config(opts.Logger) + if err != nil { + return err + } + defer tlsOpts.Close() + + esClient := newESClient(opts.Args[0], &cfg, tlsCfg) + action := createAction(esClient, cfg) + return action.Do() +} diff --git a/cmd/es-rollover/app/actions_test.go b/cmd/es-rollover/app/actions_test.go new file mode 100644 index 00000000000..b1fd51c3238 --- /dev/null +++ b/cmd/es-rollover/app/actions_test.go @@ -0,0 +1,124 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "errors" + "flag" + "net/http" + "testing" + + "github.com/crossdock/crossdock-go/assert" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/es/client" +) + +var errActionTest = errors.New("action error") + +type dummyAction struct { + TestFn func() error +} + +func (a *dummyAction) Do() error { + return a.TestFn() +} + +func TestExecuteAction(t *testing.T) { + + tests := []struct { + name string + flags []string + expectedExecuteAction bool + expectedSkip bool + expectedError error + actionFunction func() error + configError bool + }{ + { + name: "execute errored action", + flags: []string{ + "--es.tls.skip-host-verify=true", + }, + expectedExecuteAction: true, + expectedSkip: true, + expectedError: errActionTest, + }, + { + name: "execute success action", + flags: []string{ + "--es.tls.skip-host-verify=true", + }, + expectedExecuteAction: true, + expectedSkip: true, + expectedError: nil, + }, + { + name: "don't action because error in tls options", + flags: []string{ + "--es.tls.cert=/invalid/path/for/cert", + }, + expectedExecuteAction: false, + configError: true, + }, + } + logger := zap.NewNop() + args := []string{ + "https://localhost:9300", + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + v := viper.New() + tlsFlags := tlscfg.ClientFlagsConfig{Prefix: "es"} + command := cobra.Command{} + flags := &flag.FlagSet{} + tlsFlags.AddFlags(flags) + command.PersistentFlags().AddGoFlagSet(flags) + v.BindPFlags(command.PersistentFlags()) + err := command.ParseFlags(test.flags) + require.NoError(t, err) + executedAction := false + err = ExecuteAction(ActionExecuteOptions{ + Args: args, + Viper: v, + Logger: logger, + TLSFlags: tlsFlags, + }, func(c client.Client, cfg Config) Action { + assert.Equal(t, "https://localhost:9300", c.Endpoint) + transport, ok := c.Client.Transport.(*http.Transport) + require.True(t, ok) + assert.Equal(t, true, transport.TLSClientConfig.InsecureSkipVerify) + return &dummyAction{ + TestFn: func() error { + executedAction = true + return test.expectedError + }, + } + }) + + assert.Equal(t, test.expectedExecuteAction, executedAction) + if test.configError { + assert.Error(t, err) + } else { + assert.Equal(t, test.expectedError, err) + } + }) + } +} diff --git a/cmd/es-rollover/app/flags.go b/cmd/es-rollover/app/flags.go new file mode 100644 index 00000000000..c18b7221fb5 --- /dev/null +++ b/cmd/es-rollover/app/flags.go @@ -0,0 +1,65 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "flag" + + "github.com/spf13/viper" +) + +const ( + indexPrefix = "index-prefix" + archive = "archive" + username = "es.username" + password = "es.password" + useILM = "es.use-ilm" + ilmPolicyName = "es.ilm-policy-name" + timeout = "timeout" +) + +// Config holds the global configurations for the es rollover, common to all actions +type Config struct { + IndexPrefix string + Archive bool + Username string + Password string + TLSEnabled bool + ILMPolicyName string + UseILM bool + Timeout int +} + +// AddFlags adds flags +func AddFlags(flags *flag.FlagSet) { + flags.String(indexPrefix, "", "Index prefix") + flags.Bool(archive, false, "Handle archive indices") + flags.String(username, "", "The username required by storage") + flags.String(password, "", "The password required by storage") + flags.Bool(useILM, false, "Use ILM to manage jaeger indices") + flags.String(ilmPolicyName, "", "The name of the ILM policy to use if ILM is active") + flags.Int(timeout, 120, "Number of seconds to wait for master node response") +} + +// InitFromViper initializes config from viper.Viper. +func (c *Config) InitFromViper(v *viper.Viper) { + c.IndexPrefix = v.GetString(indexPrefix) + c.Archive = v.GetBool(archive) + c.Username = v.GetString(username) + c.Password = v.GetString(password) + c.ILMPolicyName = v.GetString(ilmPolicyName) + c.UseILM = v.GetBool(useILM) + c.Timeout = v.GetInt(timeout) +} diff --git a/cmd/es-rollover/app/flags_test.go b/cmd/es-rollover/app/flags_test.go new file mode 100644 index 00000000000..1b449f0006c --- /dev/null +++ b/cmd/es-rollover/app/flags_test.go @@ -0,0 +1,54 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "flag" + "testing" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBindFlags(t *testing.T) { + v := viper.New() + c := &Config{} + command := cobra.Command{} + flags := &flag.FlagSet{} + AddFlags(flags) + command.PersistentFlags().AddGoFlagSet(flags) + v.BindPFlags(command.PersistentFlags()) + + err := command.ParseFlags([]string{ + "--index-prefix=tenant1", + "--archive=true", + "--timeout=150", + "--es.username=admin", + "--es.password=qwerty123", + "--es.use-ilm=true", + "--es.ilm-policy-name=jaeger-ilm", + }) + require.NoError(t, err) + + c.InitFromViper(v) + assert.Equal(t, "tenant1", c.IndexPrefix) + assert.Equal(t, true, c.Archive) + assert.Equal(t, 150, c.Timeout) + assert.Equal(t, "admin", c.Username) + assert.Equal(t, "qwerty123", c.Password) + assert.Equal(t, "jaeger-ilm", c.ILMPolicyName) +} diff --git a/cmd/es-rollover/app/index_options.go b/cmd/es-rollover/app/index_options.go new file mode 100644 index 00000000000..5baee5269bb --- /dev/null +++ b/cmd/es-rollover/app/index_options.go @@ -0,0 +1,67 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "fmt" + "strings" +) + +const writeAliasFormat = "%s-write" +const readAliasFormat = "%s-read" +const rolloverIndexFormat = "%s-000001" + +// IndexOption holds the information for the indices to rollover +type IndexOption struct { + Prefix string + TemplateName string +} + +// RolloverIndices return an array of indices to rollover +func RolloverIndices(archive bool, prefix string) []IndexOption { + if archive { + return []IndexOption{ + { + Prefix: strings.TrimLeft(fmt.Sprintf("%s-jaeger-span-archive", prefix), "-"), + TemplateName: strings.TrimLeft(fmt.Sprintf("%s-jaeger-span", prefix), "-"), + }, + } + } + return []IndexOption{ + { + Prefix: strings.TrimLeft(fmt.Sprintf("%s-jaeger-span", prefix), "-"), + TemplateName: strings.TrimLeft(fmt.Sprintf("%s-jaeger-span", prefix), "-"), + }, + { + Prefix: strings.TrimLeft(fmt.Sprintf("%s-jaeger-service", prefix), "-"), + TemplateName: strings.TrimLeft(fmt.Sprintf("%s-jaeger-service", prefix), "-"), + }, + } +} + +// ReadAliasName returns read alias name of the index +func (i *IndexOption) ReadAliasName() string { + return fmt.Sprintf(readAliasFormat, i.Prefix) +} + +// WriteAliasName returns write alias name of the index +func (i *IndexOption) WriteAliasName() string { + return fmt.Sprintf(writeAliasFormat, i.Prefix) +} + +// InitialRolloverIndex returns the initial index rollover name +func (i *IndexOption) InitialRolloverIndex() string { + return fmt.Sprintf(rolloverIndexFormat, i.Prefix) +} diff --git a/cmd/es-rollover/app/index_options_test.go b/cmd/es-rollover/app/index_options_test.go new file mode 100644 index 00000000000..3c8c9d3c2f1 --- /dev/null +++ b/cmd/es-rollover/app/index_options_test.go @@ -0,0 +1,118 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "testing" + + "github.com/crossdock/crossdock-go/assert" +) + +func TestRolloverIndices(t *testing.T) { + type expectedValues struct { + prefix string + templateName string + readAliasName string + writeAliasName string + initialRolloverIndex string + } + + tests := []struct { + name string + archive bool + prefix string + expected []expectedValues + }{ + { + name: "Empty prefix", + expected: []expectedValues{ + { + prefix: "jaeger-span", + templateName: "jaeger-span", + readAliasName: "jaeger-span-read", + writeAliasName: "jaeger-span-write", + initialRolloverIndex: "jaeger-span-000001", + }, + { + prefix: "jaeger-service", + templateName: "jaeger-service", + readAliasName: "jaeger-service-read", + writeAliasName: "jaeger-service-write", + initialRolloverIndex: "jaeger-service-000001", + }, + }, + }, + { + name: "archive with prefix", + archive: true, + prefix: "mytenant", + expected: []expectedValues{ + { + prefix: "mytenant-jaeger-span-archive", + templateName: "mytenant-jaeger-span", + readAliasName: "mytenant-jaeger-span-archive-read", + writeAliasName: "mytenant-jaeger-span-archive-write", + initialRolloverIndex: "mytenant-jaeger-span-archive-000001", + }, + }, + }, + { + name: "archive empty prefix", + archive: true, + expected: []expectedValues{ + { + prefix: "jaeger-span-archive", + templateName: "jaeger-span", + readAliasName: "jaeger-span-archive-read", + writeAliasName: "jaeger-span-archive-write", + initialRolloverIndex: "jaeger-span-archive-000001", + }, + }, + }, + { + name: "with prefix", + prefix: "mytenant", + expected: []expectedValues{ + { + prefix: "mytenant-jaeger-span", + templateName: "mytenant-jaeger-span", + readAliasName: "mytenant-jaeger-span-read", + writeAliasName: "mytenant-jaeger-span-write", + initialRolloverIndex: "mytenant-jaeger-span-000001", + }, + { + prefix: "mytenant-jaeger-service", + templateName: "mytenant-jaeger-service", + readAliasName: "mytenant-jaeger-service-read", + writeAliasName: "mytenant-jaeger-service-write", + initialRolloverIndex: "mytenant-jaeger-service-000001", + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := RolloverIndices(test.archive, test.prefix) + for i, r := range result { + assert.Equal(t, test.expected[i].prefix, r.Prefix) + assert.Equal(t, test.expected[i].templateName, r.TemplateName) + assert.Equal(t, test.expected[i].readAliasName, r.ReadAliasName()) + assert.Equal(t, test.expected[i].writeAliasName, r.WriteAliasName()) + assert.Equal(t, test.expected[i].initialRolloverIndex, r.InitialRolloverIndex()) + } + }) + } +} diff --git a/cmd/es-rollover/app/init/action.go b/cmd/es-rollover/app/init/action.go new file mode 100644 index 00000000000..f5db9cfebc4 --- /dev/null +++ b/cmd/es-rollover/app/init/action.go @@ -0,0 +1,156 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package init + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + + "github.com/jaegertracing/jaeger/cmd/es-rollover/app" + "github.com/jaegertracing/jaeger/pkg/es" + "github.com/jaegertracing/jaeger/pkg/es/client" + "github.com/jaegertracing/jaeger/pkg/es/filter" + "github.com/jaegertracing/jaeger/plugin/storage/es/mappings" +) + +const ilmVersionSupport = 7 + +// Action holds the configuration and clients for init action +type Action struct { + Config Config + ClusterClient client.ClusterAPI + IndicesClient client.IndexAPI + ILMClient client.IndexManagementLifecycleAPI +} + +func (c Action) getMapping(version uint, templateName string) (string, error) { + mappingBuilder := mappings.MappingBuilder{ + TemplateBuilder: es.TextTemplateBuilder{}, + Shards: int64(c.Config.Shards), + Replicas: int64(c.Config.Replicas), + IndexPrefix: c.Config.IndexPrefix, + UseILM: c.Config.UseILM, + ILMPolicyName: c.Config.ILMPolicyName, + EsVersion: version, + } + return mappingBuilder.GetMapping(templateName) +} + +// Do the init action +func (c Action) Do() error { + version, err := c.ClusterClient.Version() + if err != nil { + return err + } + if c.Config.UseILM { + if version == ilmVersionSupport { + policyExist, err := c.ILMClient.Exists(c.Config.ILMPolicyName) + if err != nil { + return err + } + if !policyExist { + return fmt.Errorf("ILM policy %s doesn't exist in Elasticsearch. Please create it and re-run init", c.Config.ILMPolicyName) + } + } else { + return fmt.Errorf("ILM is supported only for ES version 7+") + } + } + rolloverIndices := app.RolloverIndices(c.Config.Archive, c.Config.IndexPrefix) + for _, indexName := range rolloverIndices { + if err := c.init(version, indexName); err != nil { + return err + } + } + return nil +} + +func createIndexIfNotExist(c client.IndexAPI, index string) error { + err := c.CreateIndex(index) + if err != nil { + if esErr, ok := err.(client.ResponseError); ok { + if esErr.StatusCode != http.StatusBadRequest || esErr.Body == nil { + return esErr.Err + } + // check for the reason of the error + jsonError := map[string]interface{}{} + err := json.Unmarshal(esErr.Body, &jsonError) + if err != nil { + // return unmarshal error + return err + } + errorMap := jsonError["error"].(map[string]interface{}) + // check for reason, ignore already exist error + if strings.Contains("resource_already_exists_exception", errorMap["type"].(string)) { + return nil + } + } + // Return any other error unrelated to the response + return err + } + return nil +} + +func (c Action) init(version uint, indexset app.IndexOption) error { + mapping, err := c.getMapping(version, indexset.TemplateName) + if err != nil { + return err + } + + err = c.IndicesClient.CreateTemplate(mapping, indexset.TemplateName) + if err != nil { + return err + } + + index := indexset.InitialRolloverIndex() + err = createIndexIfNotExist(c.IndicesClient, index) + if err != nil { + return err + } + + jaegerIndices, err := c.IndicesClient.GetJaegerIndices(c.Config.IndexPrefix) + if err != nil { + return err + } + + readAlias := indexset.ReadAliasName() + writeAlias := indexset.WriteAliasName() + aliases := []client.Alias{} + + if !filter.AliasExists(jaegerIndices, readAlias) { + aliases = append(aliases, client.Alias{ + Index: index, + Name: readAlias, + IsWriteIndex: false, + }) + } + + if !filter.AliasExists(jaegerIndices, writeAlias) { + aliases = append(aliases, client.Alias{ + Index: index, + Name: writeAlias, + IsWriteIndex: true, + }) + } + + if len(aliases) > 0 { + err = c.IndicesClient.CreateAlias(aliases) + if err != nil { + return err + } + } + return nil +} diff --git a/cmd/es-rollover/app/init/action_test.go b/cmd/es-rollover/app/init/action_test.go new file mode 100644 index 00000000000..ca708c29be5 --- /dev/null +++ b/cmd/es-rollover/app/init/action_test.go @@ -0,0 +1,251 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package init + +import ( + "errors" + "net/http" + "strings" + "testing" + + "github.com/crossdock/crossdock-go/assert" + "github.com/stretchr/testify/mock" + + "github.com/jaegertracing/jaeger/cmd/es-rollover/app" + "github.com/jaegertracing/jaeger/pkg/es/client" + "github.com/jaegertracing/jaeger/pkg/es/client/mocks" +) + +func TestIndexCreateIfNotExist(t *testing.T) { + const esErrResponse = `{"error":{"root_cause":[{"type":"resource_already_exists_exception","reason":"]"}],"type":"resource_already_exists_exception","reason":"request [/jaeger-*] contains unrecognized parameter: [help]"},"status":400}` + + tests := []struct { + name string + returnErr error + expectedErr error + containsError string + }{ + { + name: "success", + }, + { + name: "generic error", + returnErr: errors.New("may be an http error?"), + expectedErr: errors.New("may be an http error?"), + }, + { + name: "response error", + returnErr: client.ResponseError{ + Err: errors.New("x"), + StatusCode: http.StatusForbidden, + }, + expectedErr: errors.New("x"), + }, + { + name: "unmarshal error", + returnErr: client.ResponseError{ + Err: errors.New("x"), + StatusCode: http.StatusBadRequest, + Body: []byte("blablabla"), + }, + containsError: "invalid character", + }, + { + name: "existing error", + returnErr: client.ResponseError{ + Err: errors.New("x"), + StatusCode: http.StatusBadRequest, + Body: []byte(esErrResponse), + }, + expectedErr: nil, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + indexClient := &mocks.MockIndexAPI{} + indexClient.On("CreateIndex", "jaeger-span").Return(test.returnErr) + err := createIndexIfNotExist(indexClient, "jaeger-span") + if test.containsError != "" { + assert.True(t, strings.Contains(err.Error(), test.containsError)) + } else { + assert.Equal(t, test.expectedErr, err) + } + }) + } +} +func TestRolloverAction(t *testing.T) { + tests := []struct { + name string + setupCallExpectations func(indexClient *mocks.MockIndexAPI, clusterClient *mocks.MockClusterAPI, ilmClient *mocks.MockILMAPI) + config Config + expectedErr error + }{ + + { + name: "Unsupported version", + setupCallExpectations: func(indexClient *mocks.MockIndexAPI, clusterClient *mocks.MockClusterAPI, ilmClient *mocks.MockILMAPI) { + clusterClient.On("Version").Return(uint(5), nil) + }, + config: Config{ + Config: app.Config{ + Archive: true, + UseILM: true, + }, + }, + expectedErr: errors.New("ILM is supported only for ES version 7+"), + }, + { + name: "error getting version", + setupCallExpectations: func(indexClient *mocks.MockIndexAPI, clusterClient *mocks.MockClusterAPI, ilmClient *mocks.MockILMAPI) { + clusterClient.On("Version").Return(uint(0), errors.New("version error")) + }, + expectedErr: errors.New("version error"), + config: Config{ + Config: app.Config{ + Archive: true, + UseILM: true, + }, + }, + }, + { + name: "ilm doesnt exist", + setupCallExpectations: func(indexClient *mocks.MockIndexAPI, clusterClient *mocks.MockClusterAPI, ilmClient *mocks.MockILMAPI) { + clusterClient.On("Version").Return(uint(7), nil) + ilmClient.On("Exists", "myilmpolicy").Return(false, nil) + }, + expectedErr: errors.New("ILM policy myilmpolicy doesn't exist in Elasticsearch. Please create it and re-run init"), + config: Config{ + Config: app.Config{ + Archive: true, + UseILM: true, + ILMPolicyName: "myilmpolicy", + }, + }, + }, + { + name: "fail get ilm policy", + setupCallExpectations: func(indexClient *mocks.MockIndexAPI, clusterClient *mocks.MockClusterAPI, ilmClient *mocks.MockILMAPI) { + clusterClient.On("Version").Return(uint(7), nil) + ilmClient.On("Exists", "myilmpolicy").Return(false, errors.New("error getting ilm policy")) + }, + expectedErr: errors.New("error getting ilm policy"), + config: Config{ + Config: app.Config{ + Archive: true, + UseILM: true, + ILMPolicyName: "myilmpolicy", + }, + }, + }, + { + name: "fail to create template", + setupCallExpectations: func(indexClient *mocks.MockIndexAPI, clusterClient *mocks.MockClusterAPI, ilmClient *mocks.MockILMAPI) { + clusterClient.On("Version").Return(uint(7), nil) + indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(errors.New("error creating template")) + }, + expectedErr: errors.New("error creating template"), + config: Config{ + Config: app.Config{ + Archive: true, + UseILM: false, + }, + }, + }, + { + name: "fail to get jaeger indices", + setupCallExpectations: func(indexClient *mocks.MockIndexAPI, clusterClient *mocks.MockClusterAPI, ilmClient *mocks.MockILMAPI) { + clusterClient.On("Version").Return(uint(7), nil) + indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil) + indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil) + indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, errors.New("error getting jaeger indices")) + }, + expectedErr: errors.New("error getting jaeger indices"), + config: Config{ + Config: app.Config{ + Archive: true, + UseILM: false, + }, + }, + }, + { + name: "fail to create alias", + setupCallExpectations: func(indexClient *mocks.MockIndexAPI, clusterClient *mocks.MockClusterAPI, ilmClient *mocks.MockILMAPI) { + clusterClient.On("Version").Return(uint(7), nil) + indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil) + indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil) + indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil) + indexClient.On("CreateAlias", []client.Alias{ + {Index: "jaeger-span-archive-000001", Name: "jaeger-span-archive-read", IsWriteIndex: false}, + {Index: "jaeger-span-archive-000001", Name: "jaeger-span-archive-write", IsWriteIndex: true}, + }).Return(errors.New("error creating aliases")) + }, + expectedErr: errors.New("error creating aliases"), + config: Config{ + Config: app.Config{ + Archive: true, + UseILM: false, + }, + }, + }, + { + name: "create rollover index", + setupCallExpectations: func(indexClient *mocks.MockIndexAPI, clusterClient *mocks.MockClusterAPI, ilmClient *mocks.MockILMAPI) { + clusterClient.On("Version").Return(uint(7), nil) + indexClient.On("CreateTemplate", mock.Anything, "jaeger-span").Return(nil) + indexClient.On("CreateIndex", "jaeger-span-archive-000001").Return(nil) + indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil) + indexClient.On("CreateAlias", []client.Alias{ + {Index: "jaeger-span-archive-000001", Name: "jaeger-span-archive-read", IsWriteIndex: false}, + {Index: "jaeger-span-archive-000001", Name: "jaeger-span-archive-write", IsWriteIndex: true}, + }).Return(nil) + + }, + expectedErr: nil, + config: Config{ + Config: app.Config{ + Archive: true, + UseILM: false, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + indexClient := &mocks.MockIndexAPI{} + clusterClient := &mocks.MockClusterAPI{} + ilmClient := &mocks.MockILMAPI{} + initAction := Action{ + Config: test.config, + IndicesClient: indexClient, + ClusterClient: clusterClient, + ILMClient: ilmClient, + } + + test.setupCallExpectations(indexClient, clusterClient, ilmClient) + + err := initAction.Do() + if test.expectedErr != nil { + assert.Error(t, err) + assert.Equal(t, test.expectedErr, err) + } + + indexClient.AssertExpectations(t) + clusterClient.AssertExpectations(t) + ilmClient.AssertExpectations(t) + }) + } +} diff --git a/cmd/es-rollover/app/init/flags.go b/cmd/es-rollover/app/init/flags.go new file mode 100644 index 00000000000..763656a7b3c --- /dev/null +++ b/cmd/es-rollover/app/init/flags.go @@ -0,0 +1,47 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package init + +import ( + "flag" + + "github.com/spf13/viper" + + "github.com/jaegertracing/jaeger/cmd/es-rollover/app" +) + +const ( + shards = "shards" + replicas = "replicas" +) + +// Config holds configuration for index cleaner binary. +type Config struct { + app.Config + Shards int + Replicas int +} + +// AddFlags adds flags for TLS to the FlagSet. +func (c *Config) AddFlags(flags *flag.FlagSet) { + flags.Int(shards, 5, "Number of shards") + flags.Int(replicas, 1, "Number of replicas") +} + +// InitFromViper initializes config from viper.Viper. +func (c *Config) InitFromViper(v *viper.Viper) { + c.Shards = v.GetInt(shards) + c.Replicas = v.GetInt(replicas) +} diff --git a/cmd/es-rollover/app/init/flags_test.go b/cmd/es-rollover/app/init/flags_test.go new file mode 100644 index 00000000000..483a3ce9690 --- /dev/null +++ b/cmd/es-rollover/app/init/flags_test.go @@ -0,0 +1,45 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package init + +import ( + "flag" + "testing" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBindFlags(t *testing.T) { + v := viper.New() + c := &Config{} + command := cobra.Command{} + flags := &flag.FlagSet{} + c.AddFlags(flags) + command.PersistentFlags().AddGoFlagSet(flags) + v.BindPFlags(command.PersistentFlags()) + + err := command.ParseFlags([]string{ + "--shards=8", + "--replicas=16", + }) + require.NoError(t, err) + + c.InitFromViper(v) + assert.Equal(t, 8, c.Shards) + assert.Equal(t, 16, c.Replicas) +} diff --git a/cmd/es-rollover/app/lookback/action.go b/cmd/es-rollover/app/lookback/action.go new file mode 100644 index 00000000000..cfa782e2d6d --- /dev/null +++ b/cmd/es-rollover/app/lookback/action.go @@ -0,0 +1,70 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lookback + +import ( + "fmt" + "time" + + "github.com/jaegertracing/jaeger/cmd/es-rollover/app" + "github.com/jaegertracing/jaeger/pkg/es/client" + "github.com/jaegertracing/jaeger/pkg/es/filter" +) + +var timeNow func() time.Time = time.Now + +// Action holds the configuration and clients for lookback action +type Action struct { + Config + IndicesClient client.IndexAPI +} + +// Do the lookback action +func (a *Action) Do() error { + rolloverIndices := app.RolloverIndices(a.Config.Archive, a.Config.IndexPrefix) + for _, indexName := range rolloverIndices { + if err := a.lookback(indexName); err != nil { + return err + } + } + return nil +} + +func (a *Action) lookback(indexSet app.IndexOption) error { + jaegerIndex, err := a.IndicesClient.GetJaegerIndices(a.Config.IndexPrefix) + if err != nil { + return err + } + + readAliasName := indexSet.ReadAliasName() + readAliasIndices := filter.ByAlias(jaegerIndex, []string{readAliasName}) + excludedWriteIndex := filter.ByAliasExclude(readAliasIndices, []string{indexSet.WriteAliasName()}) + finalIndices := filter.ByDate(excludedWriteIndex, getTimeReference(timeNow(), a.Unit, a.UnitCount)) + + if len(finalIndices) == 0 { + return fmt.Errorf("no indices to remove from alias %s", readAliasName) + } + + aliases := make([]client.Alias, 0, len(finalIndices)) + + for _, index := range finalIndices { + aliases = append(aliases, client.Alias{ + Index: index.Index, + Name: readAliasName, + }) + } + + return a.IndicesClient.DeleteAlias(aliases) +} diff --git a/cmd/es-rollover/app/lookback/action_test.go b/cmd/es-rollover/app/lookback/action_test.go new file mode 100644 index 00000000000..d2af0804421 --- /dev/null +++ b/cmd/es-rollover/app/lookback/action_test.go @@ -0,0 +1,152 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lookback + +import ( + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/cmd/es-rollover/app" + "github.com/jaegertracing/jaeger/pkg/es/client" + "github.com/jaegertracing/jaeger/pkg/es/client/mocks" +) + +func TestLookBackAction(t *testing.T) { + nowTime := time.Date(2021, 10, 12, 10, 10, 10, 10, time.Local) + indices := []client.Index{ + { + Index: "jaeger-span-archive-0000", + Aliases: map[string]bool{ + "jaeger-span-archive-other-alias": true, + }, + CreationTime: time.Date(2021, 10, 10, 10, 10, 10, 10, time.Local), + }, + { + Index: "jaeger-span-archive-0001", + Aliases: map[string]bool{ + "jaeger-span-archive-read": true, + }, + CreationTime: time.Date(2021, 10, 10, 10, 10, 10, 10, time.Local), + }, + { + Index: "jaeger-span-archive-0002", + Aliases: map[string]bool{ + "jaeger-span-archive-read": true, + "jaeger-span-archive-write": true, + }, + CreationTime: time.Date(2021, 10, 11, 10, 10, 10, 10, time.Local), + }, + { + Index: "jaeger-span-archive-0002", + Aliases: map[string]bool{ + "jaeger-span-archive-read": true, + }, + CreationTime: nowTime, + }, + { + Index: "jaeger-span-archive-0004", + Aliases: map[string]bool{ + "jaeger-span-archive-read": true, + "jaeger-span-archive-write": true, + }, + CreationTime: nowTime, + }, + } + + timeNow = func() time.Time { + return nowTime + } + + tests := []struct { + name string + setupCallExpectations func(indexClient *mocks.MockIndexAPI) + config Config + expectedErr error + }{ + { + name: "success", + setupCallExpectations: func(indexClient *mocks.MockIndexAPI) { + indexClient.On("GetJaegerIndices", "").Return(indices, nil) + indexClient.On("DeleteAlias", []client.Alias{ + { + Index: "jaeger-span-archive-0001", + Name: "jaeger-span-archive-read", + }, + }).Return(nil) + + }, + config: Config{ + Unit: "days", + UnitCount: 1, + Config: app.Config{ + Archive: true, + UseILM: true, + }, + }, + expectedErr: nil, + }, + { + name: "get indices error", + setupCallExpectations: func(indexClient *mocks.MockIndexAPI) { + indexClient.On("GetJaegerIndices", "").Return(indices, errors.New("get indices error")) + }, + config: Config{ + Unit: "days", + UnitCount: 1, + Config: app.Config{ + Archive: true, + UseILM: true, + }, + }, + expectedErr: errors.New("get indices error"), + }, + { + name: "empty indices error", + setupCallExpectations: func(indexClient *mocks.MockIndexAPI) { + indexClient.On("GetJaegerIndices", "").Return([]client.Index{}, nil) + }, + config: Config{ + Unit: "days", + UnitCount: 1, + Config: app.Config{ + Archive: true, + UseILM: true, + }, + }, + expectedErr: errors.New("no indices to remove from alias jaeger-span-archive-read"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + indexClient := &mocks.MockIndexAPI{} + lookbackAction := Action{ + Config: test.config, + IndicesClient: indexClient, + } + + test.setupCallExpectations(indexClient) + + err := lookbackAction.Do() + if test.expectedErr != nil { + assert.Error(t, err) + assert.Equal(t, test.expectedErr, err) + } + }) + } +} diff --git a/cmd/es-rollover/app/lookback/flags.go b/cmd/es-rollover/app/lookback/flags.go new file mode 100644 index 00000000000..b68d03eb898 --- /dev/null +++ b/cmd/es-rollover/app/lookback/flags.go @@ -0,0 +1,49 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lookback + +import ( + "flag" + + "github.com/spf13/viper" + + "github.com/jaegertracing/jaeger/cmd/es-rollover/app" +) + +const ( + unit = "unit" + unitCount = "unit-count" + defaultUnit = "days" + defaultUnitCount = 1 +) + +// Config holds configuration for index cleaner binary. +type Config struct { + app.Config + Unit string + UnitCount int +} + +// AddFlags adds flags for TLS to the FlagSet. +func (c *Config) AddFlags(flags *flag.FlagSet) { + flags.String(unit, defaultUnit, "used with lookback to remove indices from read alias e.g, days, weeks, months, years") + flags.Int(unitCount, defaultUnitCount, "count of UNITs") +} + +// InitFromViper initializes config from viper.Viper. +func (c *Config) InitFromViper(v *viper.Viper) { + c.Unit = v.GetString(unit) + c.UnitCount = v.GetInt(unitCount) +} diff --git a/cmd/es-rollover/app/lookback/flags_test.go b/cmd/es-rollover/app/lookback/flags_test.go new file mode 100644 index 00000000000..82d607a7be9 --- /dev/null +++ b/cmd/es-rollover/app/lookback/flags_test.go @@ -0,0 +1,45 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lookback + +import ( + "flag" + "testing" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBindFlags(t *testing.T) { + v := viper.New() + c := &Config{} + command := cobra.Command{} + flags := &flag.FlagSet{} + c.AddFlags(flags) + command.PersistentFlags().AddGoFlagSet(flags) + v.BindPFlags(command.PersistentFlags()) + + err := command.ParseFlags([]string{ + "--unit=days", + "--unit-count=16", + }) + require.NoError(t, err) + + c.InitFromViper(v) + assert.Equal(t, "days", c.Unit) + assert.Equal(t, 16, c.UnitCount) +} diff --git a/cmd/es-rollover/app/lookback/time_reference.go b/cmd/es-rollover/app/lookback/time_reference.go new file mode 100644 index 00000000000..88b70f7b191 --- /dev/null +++ b/cmd/es-rollover/app/lookback/time_reference.go @@ -0,0 +1,41 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lookback + +import "time" + +func getTimeReference(currentTime time.Time, units string, unitCount int) time.Time { + switch units { + case "minutes": + return currentTime.Truncate(time.Minute).Add(-time.Duration(unitCount) * time.Minute) + case "hours": + return currentTime.Truncate(time.Hour).Add(-time.Duration(unitCount) * time.Hour) + case "days": + year, month, day := currentTime.Date() + tomorrowMidnight := time.Date(year, month, day, 0, 0, 0, 0, currentTime.Location()).AddDate(0, 0, 1) + return tomorrowMidnight.Add(-time.Hour * 24 * time.Duration(unitCount)) + case "weeks": + year, month, day := currentTime.Date() + tomorrowMidnight := time.Date(year, month, day, 0, 0, 0, 0, currentTime.Location()).AddDate(0, 0, 1) + return tomorrowMidnight.Add(-time.Hour * 24 * time.Duration(7*unitCount)) + case "months": + year, month, day := currentTime.Date() + return time.Date(year, month, day, 0, 0, 0, 0, currentTime.Location()).AddDate(0, -1*unitCount, 0) + case "years": + year, month, day := currentTime.Date() + return time.Date(year, month, day, 0, 0, 0, 0, currentTime.Location()).AddDate(-1*unitCount, 0, 0) + } + return currentTime.Truncate(time.Second).Add(-time.Duration(unitCount) * time.Second) +} diff --git a/cmd/es-rollover/app/lookback/time_reference_test.go b/cmd/es-rollover/app/lookback/time_reference_test.go new file mode 100644 index 00000000000..5ee8ef1cf10 --- /dev/null +++ b/cmd/es-rollover/app/lookback/time_reference_test.go @@ -0,0 +1,83 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package lookback + +import ( + "testing" + "time" + + "github.com/crossdock/crossdock-go/assert" +) + +func TestGetTimeReference(t *testing.T) { + now := time.Date(2021, time.October, 10, 10, 10, 10, 10, time.UTC) + + tests := []struct { + name string + unit string + unitCount int + expectedTime time.Time + }{ + { + name: "seconds unit", + unit: "seconds", + unitCount: 30, + expectedTime: time.Date(2021, time.October, 10, 10, 9, 40, 00, time.UTC), + }, + { + name: "minutes unit", + unit: "minutes", + unitCount: 30, + expectedTime: time.Date(2021, time.October, 10, 9, 40, 00, 00, time.UTC), + }, + { + name: "hours unit", + unit: "hours", + unitCount: 2, + expectedTime: time.Date(2021, time.October, 10, 8, 00, 00, 00, time.UTC), + }, + { + name: "days unit", + unit: "days", + unitCount: 2, + expectedTime: time.Date(2021, 10, 9, 0, 0, 0, 0, time.UTC), + }, + { + name: "weeks unit", + unit: "weeks", + unitCount: 2, + expectedTime: time.Date(2021, time.September, 27, 0, 0, 0, 0, time.UTC), + }, + { + name: "months unit", + unit: "months", + unitCount: 2, + expectedTime: time.Date(2021, time.August, 10, 0, 0, 0, 0, time.UTC), + }, + { + name: "years unit", + unit: "years", + unitCount: 2, + expectedTime: time.Date(2019, time.October, 10, 0, 0, 0, 0, time.UTC), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ref := getTimeReference(now, test.unit, test.unitCount) + assert.Equal(t, test.expectedTime, ref) + }) + } +} diff --git a/cmd/es-rollover/app/rollover/action.go b/cmd/es-rollover/app/rollover/action.go new file mode 100644 index 00000000000..7cb82ad7023 --- /dev/null +++ b/cmd/es-rollover/app/rollover/action.go @@ -0,0 +1,71 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rollover + +import ( + "encoding/json" + + "github.com/jaegertracing/jaeger/cmd/es-rollover/app" + "github.com/jaegertracing/jaeger/pkg/es/client" + "github.com/jaegertracing/jaeger/pkg/es/filter" +) + +// Action holds the configuration and clients for rollover action +type Action struct { + Config + IndicesClient client.IndexAPI +} + +// Do the rollover action +func (a *Action) Do() error { + rolloverIndices := app.RolloverIndices(a.Config.Archive, a.Config.IndexPrefix) + for _, indexName := range rolloverIndices { + if err := a.rollover(indexName); err != nil { + return err + } + } + return nil +} + +func (a *Action) rollover(indexSet app.IndexOption) error { + conditionsMap := map[string]interface{}{} + if len(a.Conditions) > 0 { + err := json.Unmarshal([]byte(a.Config.Conditions), &conditionsMap) + if err != nil { + return err + } + } + + writeAlias := indexSet.WriteAliasName() + readAlias := indexSet.ReadAliasName() + err := a.IndicesClient.Rollover(writeAlias, conditionsMap) + if err != nil { + return err + } + jaegerIndex, err := a.IndicesClient.GetJaegerIndices(a.Config.IndexPrefix) + if err != nil { + return err + } + + indicesWithWriteAlias := filter.ByAlias(jaegerIndex, []string{writeAlias}) + aliases := make([]client.Alias, 0, len(indicesWithWriteAlias)) + for _, index := range indicesWithWriteAlias { + aliases = append(aliases, client.Alias{ + Index: index.Index, + Name: readAlias, + }) + } + return a.IndicesClient.CreateAlias(aliases) +} diff --git a/cmd/es-rollover/app/rollover/action_test.go b/cmd/es-rollover/app/rollover/action_test.go new file mode 100644 index 00000000000..0376270b95a --- /dev/null +++ b/cmd/es-rollover/app/rollover/action_test.go @@ -0,0 +1,112 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rollover + +import ( + "errors" + "testing" + + "github.com/crossdock/crossdock-go/assert" + + "github.com/jaegertracing/jaeger/cmd/es-rollover/app" + "github.com/jaegertracing/jaeger/pkg/es/client" + "github.com/jaegertracing/jaeger/pkg/es/client/mocks" +) + +func TestRolloverAction(t *testing.T) { + readIndices := []client.Index{ + { + Index: "jaeger-read-span", + Aliases: map[string]bool{ + "jaeger-span-archive-write": true, + }, + }, + } + + aliasToCreate := []client.Alias{{Index: "jaeger-read-span", Name: "jaeger-span-archive-read", IsWriteIndex: false}} + tests := []struct { + name string + conditions string + unmarshalErrExpected bool + getJaegerIndicesErr error + rolloverErr error + createAliasErr error + expectedError bool + }{ + { + name: "success", + conditions: "{\"max_age\": \"2d\"}", + expectedError: false, + }, + { + name: "get jaeger indices error", + conditions: "{\"max_age\": \"2d\"}", + expectedError: true, + getJaegerIndicesErr: errors.New("unable to get indices"), + }, + { + name: "rollover error", + conditions: "{\"max_age\": \"2d\"}", + expectedError: true, + rolloverErr: errors.New("unable to rollover"), + }, + { + name: "create alias error", + conditions: "{\"max_age\": \"2d\"}", + expectedError: true, + createAliasErr: errors.New("unable to create alias"), + }, + { + name: "unmarshal conditions error", + conditions: "{\"max_age\" \"2d\"},", + unmarshalErrExpected: true, + createAliasErr: errors.New("unable to create alias"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + indexClient := &mocks.MockIndexAPI{} + + rolloverAction := Action{ + Config: Config{ + Conditions: test.conditions, + Config: app.Config{ + Archive: true, + }, + }, + IndicesClient: indexClient, + } + + if !test.unmarshalErrExpected { + if test.rolloverErr == nil { + indexClient.On("GetJaegerIndices", "").Return(readIndices, test.getJaegerIndicesErr) + if test.getJaegerIndicesErr == nil { + indexClient.On("CreateAlias", aliasToCreate).Return(test.createAliasErr) + } + } + indexClient.On("Rollover", "jaeger-span-archive-write", map[string]interface{}{"max_age": "2d"}).Return(test.rolloverErr) + } + + err := rolloverAction.Do() + if test.expectedError || test.unmarshalErrExpected { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + indexClient.AssertExpectations(t) + }) + } +} diff --git a/cmd/es-rollover/app/rollover/flags.go b/cmd/es-rollover/app/rollover/flags.go new file mode 100644 index 00000000000..2e5cd66144d --- /dev/null +++ b/cmd/es-rollover/app/rollover/flags.go @@ -0,0 +1,44 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rollover + +import ( + "flag" + + "github.com/spf13/viper" + + "github.com/jaegertracing/jaeger/cmd/es-rollover/app" +) + +const ( + conditions = "conditions" + defaultRollbackCondition = "{\"max_age\": \"2d\"}" +) + +// Config holds configuration for index cleaner binary. +type Config struct { + app.Config + Conditions string +} + +// AddFlags adds flags for TLS to the FlagSet. +func (c *Config) AddFlags(flags *flag.FlagSet) { + flags.String(conditions, defaultRollbackCondition, "conditions used to rollover to a new write index") +} + +// InitFromViper initializes config from viper.Viper. +func (c *Config) InitFromViper(v *viper.Viper) { + c.Conditions = v.GetString(conditions) +} diff --git a/cmd/es-rollover/app/rollover/flags_test.go b/cmd/es-rollover/app/rollover/flags_test.go new file mode 100644 index 00000000000..3de8337862a --- /dev/null +++ b/cmd/es-rollover/app/rollover/flags_test.go @@ -0,0 +1,43 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rollover + +import ( + "flag" + "testing" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestBindFlags(t *testing.T) { + v := viper.New() + c := &Config{} + command := cobra.Command{} + flags := &flag.FlagSet{} + c.AddFlags(flags) + command.PersistentFlags().AddGoFlagSet(flags) + v.BindPFlags(command.PersistentFlags()) + + err := command.ParseFlags([]string{ + "--conditions={\"max_age\": \"20000d\"}", + }) + require.NoError(t, err) + + c.InitFromViper(v) + assert.Equal(t, "{\"max_age\": \"20000d\"}", c.Conditions) +} diff --git a/cmd/es-rollover/main.go b/cmd/es-rollover/main.go new file mode 100644 index 00000000000..8521ad1c8ee --- /dev/null +++ b/cmd/es-rollover/main.go @@ -0,0 +1,167 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "log" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/cmd/es-rollover/app" + initialize "github.com/jaegertracing/jaeger/cmd/es-rollover/app/init" + "github.com/jaegertracing/jaeger/cmd/es-rollover/app/lookback" + "github.com/jaegertracing/jaeger/cmd/es-rollover/app/rollover" + "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/es/client" +) + +func main() { + v := viper.New() + logger, _ := zap.NewProduction() + + var rootCmd = &cobra.Command{ + Use: "jaeger-es-rollover", + Short: "Jaeger es-rollover manages Jaeger indices", + Long: "Jaeger es-rollover manages Jaeger indices", + } + + tlsFlags := tlscfg.ClientFlagsConfig{Prefix: "es"} + + // Init command + initCfg := &initialize.Config{} + initCommand := &cobra.Command{ + Use: "init http://HOSTNAME:PORT", + Short: "creates indices and aliases", + Long: "creates indices and aliases", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + return app.ExecuteAction(app.ActionExecuteOptions{ + Args: args, + Viper: v, + Logger: logger, + TLSFlags: tlsFlags, + }, func(c client.Client, cfg app.Config) app.Action { + initCfg.Config = cfg + initCfg.InitFromViper(v) + indicesClient := &client.IndicesClient{ + Client: c, + MasterTimeoutSeconds: initCfg.Timeout, + } + clusterClient := &client.ClusterClient{ + Client: c, + } + ilmClient := &client.ILMClient{ + Client: c, + } + return &initialize.Action{ + IndicesClient: indicesClient, + ClusterClient: clusterClient, + ILMClient: ilmClient, + Config: *initCfg, + } + }) + }, + } + + // Rollover command + rolloverCfg := &rollover.Config{} + + rolloverCommand := &cobra.Command{ + Use: "rollover http://HOSTNAME:PORT", + Short: "rollover to new write index", + Long: "rollover to new write index", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + rolloverCfg.InitFromViper(v) + return app.ExecuteAction(app.ActionExecuteOptions{ + Args: args, + Viper: v, + Logger: logger, + TLSFlags: tlsFlags, + }, func(c client.Client, cfg app.Config) app.Action { + rolloverCfg.Config = cfg + rolloverCfg.InitFromViper(v) + indicesClient := &client.IndicesClient{ + Client: c, + MasterTimeoutSeconds: rolloverCfg.Timeout, + } + + return &rollover.Action{ + IndicesClient: indicesClient, + Config: *rolloverCfg, + } + }) + }, + } + + lookbackCfg := lookback.Config{} + lookbackCommand := &cobra.Command{ + Use: "lookback http://HOSTNAME:PORT", + Short: "removes old indices from read alias", + Long: "removes old indices from read alias", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + lookbackCfg.InitFromViper(v) + return app.ExecuteAction(app.ActionExecuteOptions{ + Args: args, + Viper: v, + Logger: logger, + TLSFlags: tlsFlags, + }, func(c client.Client, cfg app.Config) app.Action { + lookbackCfg.Config = cfg + lookbackCfg.InitFromViper(v) + indicesClient := &client.IndicesClient{ + Client: c, + MasterTimeoutSeconds: lookbackCfg.Timeout, + } + return &lookback.Action{ + IndicesClient: indicesClient, + Config: lookbackCfg, + } + }) + }, + } + + addPersistentFlags(v, rootCmd, tlsFlags.AddFlags, app.AddFlags) + addSubCommand(v, rootCmd, initCommand, initCfg.AddFlags) + addSubCommand(v, rootCmd, rolloverCommand, rolloverCfg.AddFlags) + addSubCommand(v, rootCmd, lookbackCommand, lookbackCfg.AddFlags) + + if err := rootCmd.Execute(); err != nil { + log.Fatalln(err) + } +} + +func addSubCommand(v *viper.Viper, rootCmd, cmd *cobra.Command, addFlags func(*flag.FlagSet)) { + rootCmd.AddCommand(cmd) + config.AddFlags( + v, + cmd, + addFlags, + ) +} + +func addPersistentFlags(v *viper.Viper, rootCmd *cobra.Command, inits ...func(*flag.FlagSet)) { + flagSet := new(flag.FlagSet) + for i := range inits { + inits[i](flagSet) + } + rootCmd.PersistentFlags().AddGoFlagSet(flagSet) + v.BindPFlags(rootCmd.PersistentFlags()) +} diff --git a/pkg/es/client/basic_auth.go b/pkg/es/client/basic_auth.go new file mode 100644 index 00000000000..49670b53279 --- /dev/null +++ b/pkg/es/client/basic_auth.go @@ -0,0 +1,25 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import "encoding/base64" + +// BasicAuth encode username and password to be used with basic authentication header +func BasicAuth(username, password string) string { + if username == "" || password == "" { + return "" + } + return base64.StdEncoding.EncodeToString([]byte(username + ":" + password)) +} diff --git a/pkg/es/client/basic_auth_test.go b/pkg/es/client/basic_auth_test.go new file mode 100644 index 00000000000..124b9ca3866 --- /dev/null +++ b/pkg/es/client/basic_auth_test.go @@ -0,0 +1,50 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBasicAuth(t *testing.T) { + tests := []struct { + name string + username string + password string + expectedResult string + }{ + { + name: "user and password", + username: "admin", + password: "qwerty123456", + expectedResult: "YWRtaW46cXdlcnR5MTIzNDU2", + }, + { + name: "username empty", + username: "", + password: "qwerty123456", + expectedResult: "", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := BasicAuth(test.username, test.password) + assert.Equal(t, test.expectedResult, result) + + }) + } +} diff --git a/pkg/es/client/client.go b/pkg/es/client/client.go new file mode 100644 index 00000000000..04ff294e8b5 --- /dev/null +++ b/pkg/es/client/client.go @@ -0,0 +1,118 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" +) + +// ResponseError holds information about a request error +type ResponseError struct { + // Error returned by the http client + Err error + // StatusCode is the http code returned by the server (if any) + StatusCode int + // Body is the bytes readed in the response (if any) + Body []byte +} + +// Error returns the error string of the Err field +func (r ResponseError) Error() string { + return r.Err.Error() +} + +func (r ResponseError) prefixMessage(message string) ResponseError { + return ResponseError{ + Err: fmt.Errorf("%s, %w", message, r.Err), + StatusCode: r.StatusCode, + Body: r.Body, + } +} + +func newResponseError(err error, code int, body []byte) ResponseError { + return ResponseError{ + Err: err, + StatusCode: code, + Body: body, + } +} + +// Client is a generic client to make requests to ES +type Client struct { + // Http client. + Client *http.Client + // ES server endpoint. + Endpoint string + // Basic authentication string. + BasicAuth string +} + +type elasticRequest struct { + endpoint string + body []byte + method string +} + +func (c *Client) request(esRequest elasticRequest) ([]byte, error) { + var reader *bytes.Buffer + var r *http.Request + var err error + if len(esRequest.body) > 0 { + reader = bytes.NewBuffer(esRequest.body) + r, err = http.NewRequest(esRequest.method, fmt.Sprintf("%s/%s", c.Endpoint, esRequest.endpoint), reader) + } else { + r, err = http.NewRequest(esRequest.method, fmt.Sprintf("%s/%s", c.Endpoint, esRequest.endpoint), nil) + } + if err != nil { + return []byte{}, err + } + c.setAuthorization(r) + r.Header.Add("Content-Type", "application/json") + res, err := c.Client.Do(r) + if err != nil { + return []byte{}, err + } + + if res.StatusCode != http.StatusOK { + return []byte{}, c.handleFailedRequest(res) + } + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + return []byte{}, err + } + return body, nil +} + +func (c *Client) setAuthorization(r *http.Request) { + if c.BasicAuth != "" { + r.Header.Add("Authorization", fmt.Sprintf("Basic %s", c.BasicAuth)) + } +} + +func (c *Client) handleFailedRequest(res *http.Response) error { + if res.Body != nil { + bodyBytes, err := ioutil.ReadAll(res.Body) + if err != nil { + return newResponseError(fmt.Errorf("request failed and failed to read response body, status code: %d, %w", res.StatusCode, err), res.StatusCode, nil) + } + body := string(bodyBytes) + return newResponseError(fmt.Errorf("request failed, status code: %d, body: %s", res.StatusCode, body), res.StatusCode, bodyBytes) + } + return newResponseError(fmt.Errorf("request failed, status code: %d", res.StatusCode), res.StatusCode, nil) +} diff --git a/pkg/es/client/cluster_client.go b/pkg/es/client/cluster_client.go new file mode 100644 index 00000000000..dfa7df0a881 --- /dev/null +++ b/pkg/es/client/cluster_client.go @@ -0,0 +1,65 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" +) + +var _ ClusterAPI = (*ClusterClient)(nil) + +// ClusterClient is a client used to get ES cluster information +type ClusterClient struct { + Client +} + +// Version returns the major version of the ES cluster +func (c *ClusterClient) Version() (uint, error) { + type clusterInfo struct { + Version map[string]interface{} `json:"version"` + TagLine string `json:"tagline"` + } + body, err := c.request(elasticRequest{ + endpoint: "/", + method: http.MethodGet, + }) + + if err != nil { + return 0, err + } + var info clusterInfo + if err = json.Unmarshal(body, &info); err != nil { + return 0, err + } + + versionField := info.Version["number"] + versionNumber, isString := versionField.(string) + if !isString { + return 0, fmt.Errorf("invalid version format: %w", versionField) + } + version := strings.Split(versionNumber, ".") + major, err := strconv.ParseUint(version[0], 10, 32) + if err != nil { + return 0, fmt.Errorf("invalid version format: %s", version[0]) + } + if strings.Contains(info.TagLine, "OpenSearch") && major == 1 { + return 7, nil + } + return uint(major), nil +} diff --git a/pkg/es/client/cluster_client_test.go b/pkg/es/client/cluster_client_test.go new file mode 100644 index 00000000000..f979b794ada --- /dev/null +++ b/pkg/es/client/cluster_client_test.go @@ -0,0 +1,204 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const badVersionType = ` +{ + "name" : "opensearch-node1", + "cluster_name" : "opensearch-cluster", + "cluster_uuid" : "1StaUGrGSx61r41d-1nDiw", + "version" : { + "distribution" : "opensearch", + "number" : true, + "build_type" : "tar", + "build_hash" : "34550c5b17124ddc59458ef774f6b43a086522e3", + "build_date" : "2021-07-02T23:22:21.383695Z", + "build_snapshot" : false, + "lucene_version" : "8.8.2", + "minimum_wire_compatibility_version" : "6.8.0", + "minimum_index_compatibility_version" : "6.0.0-beta1" + }, + "tagline" : "The OpenSearch Project: https://opensearch.org/" + } +` + +const badVersionNoNumber = ` +{ + "name" : "opensearch-node1", + "cluster_name" : "opensearch-cluster", + "cluster_uuid" : "1StaUGrGSx61r41d-1nDiw", + "version" : { + "distribution" : "opensearch", + "number" : "thisisnotanumber", + "build_type" : "tar", + "build_hash" : "34550c5b17124ddc59458ef774f6b43a086522e3", + "build_date" : "2021-07-02T23:22:21.383695Z", + "build_snapshot" : false, + "lucene_version" : "8.8.2", + "minimum_wire_compatibility_version" : "6.8.0", + "minimum_index_compatibility_version" : "6.0.0-beta1" + }, + "tagline" : "The OpenSearch Project: https://opensearch.org/" + } +` + +const opensearchInfo = ` +{ + "name" : "opensearch-node1", + "cluster_name" : "opensearch-cluster", + "cluster_uuid" : "1StaUGrGSx61r41d-1nDiw", + "version" : { + "distribution" : "opensearch", + "number" : "1.0.0", + "build_type" : "tar", + "build_hash" : "34550c5b17124ddc59458ef774f6b43a086522e3", + "build_date" : "2021-07-02T23:22:21.383695Z", + "build_snapshot" : false, + "lucene_version" : "8.8.2", + "minimum_wire_compatibility_version" : "6.8.0", + "minimum_index_compatibility_version" : "6.0.0-beta1" + }, + "tagline" : "The OpenSearch Project: https://opensearch.org/" + } +` + +const elasticsearch7 = ` + +{ + "name" : "elasticsearch-0", + "cluster_name" : "clustername", + "cluster_uuid" : "HUtdg7bRTomSFaOk7Wzt8w", + "version" : { + "number" : "7.6.1", + "build_flavor" : "default", + "build_type" : "docker", + "build_hash" : "aa751e09be0a5072e8570670309b1f12348f023b", + "build_date" : "2020-02-29T00:15:25.529771Z", + "build_snapshot" : false, + "lucene_version" : "8.4.0", + "minimum_wire_compatibility_version" : "6.8.0", + "minimum_index_compatibility_version" : "6.0.0-beta1" + }, + "tagline" : "You Know, for Search" + } +` + +const elasticsearch6 = ` + +{ + "name" : "elasticsearch-0", + "cluster_name" : "clustername", + "cluster_uuid" : "HUtdg7bRTomSFaOk7Wzt8w", + "version" : { + "number" : "6.8.0", + "build_flavor" : "default", + "build_type" : "docker", + "build_hash" : "aa751e09be0a5072e8570670309b1f12348f023b", + "build_date" : "2020-02-29T00:15:25.529771Z", + "build_snapshot" : false, + "lucene_version" : "8.4.0", + "minimum_wire_compatibility_version" : "6.8.0", + "minimum_index_compatibility_version" : "6.0.0-beta1" + }, + "tagline" : "You Know, for Search" + } +` + +func TestVersion(t *testing.T) { + tests := []struct { + name string + responseCode int + response string + errContains string + expectedResult uint + }{ + { + name: "success with elasticsearch 6", + responseCode: http.StatusOK, + response: elasticsearch6, + expectedResult: 6, + }, + { + name: "success with elasticsearch 7", + responseCode: http.StatusOK, + response: elasticsearch7, + expectedResult: 7, + }, + { + name: "success with opensearch", + responseCode: http.StatusOK, + response: opensearchInfo, + expectedResult: 7, + }, + { + name: "client error", + responseCode: http.StatusBadRequest, + response: esErrResponse, + errContains: "request failed, status code: 400", + }, + { + name: "bad version", + responseCode: http.StatusOK, + response: badVersionType, + errContains: "invalid version format: %!w(bool=true)", + }, + { + name: "version not a number", + responseCode: http.StatusOK, + response: badVersionNoNumber, + errContains: "invalid version format: thisisnotanumber", + }, + { + name: "unmarshal error", + responseCode: http.StatusOK, + response: "thisisaninvalidjson", + errContains: "invalid character", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + assert.Equal(t, http.MethodGet, req.Method) + assert.Equal(t, "Basic foobar", req.Header.Get("Authorization")) + res.WriteHeader(test.responseCode) + res.Write([]byte(test.response)) + })) + defer testServer.Close() + + c := &ClusterClient{ + Client: Client{ + Client: testServer.Client(), + Endpoint: testServer.URL, + BasicAuth: "foobar", + }, + } + result, err := c.Version() + if test.errContains != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } + assert.Equal(t, test.expectedResult, result) + }) + } +} diff --git a/pkg/es/client/ilm_client.go b/pkg/es/client/ilm_client.go new file mode 100644 index 00000000000..5d06328fc23 --- /dev/null +++ b/pkg/es/client/ilm_client.go @@ -0,0 +1,47 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "fmt" + "net/http" +) + +var _ IndexManagementLifecycleAPI = (*ILMClient)(nil) + +// ILMClient is a client used to manipulate Index lifecycle management policies. +type ILMClient struct { + Client + MasterTimeoutSeconds int +} + +// Exists verify if a ILM policy exists +func (i ILMClient) Exists(name string) (bool, error) { + _, err := i.request(elasticRequest{ + endpoint: fmt.Sprintf("_ilm/policy/%s", name), + method: http.MethodGet, + }) + + if respError, isResponseErr := err.(ResponseError); isResponseErr { + if respError.StatusCode == http.StatusNotFound { + return false, nil + } + } + + if err != nil { + return false, fmt.Errorf("failed to get ILM policy: %s, %w", name, err) + } + return true, nil +} diff --git a/pkg/es/client/ilm_client_test.go b/pkg/es/client/ilm_client_test.go new file mode 100644 index 00000000000..c19a0caef2e --- /dev/null +++ b/pkg/es/client/ilm_client_test.go @@ -0,0 +1,78 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExists(t *testing.T) { + tests := []struct { + name string + responseCode int + response string + errContains string + expectedResult bool + }{ + { + name: "found", + responseCode: http.StatusOK, + expectedResult: true, + }, + { + name: "not found", + responseCode: http.StatusNotFound, + response: esErrResponse, + }, + { + name: "client error", + responseCode: http.StatusBadRequest, + response: esErrResponse, + errContains: "failed to get ILM policy: jaeger-ilm-policy", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + assert.True(t, strings.HasSuffix(req.URL.String(), "_ilm/policy/jaeger-ilm-policy")) + assert.Equal(t, http.MethodGet, req.Method) + assert.Equal(t, "Basic foobar", req.Header.Get("Authorization")) + res.WriteHeader(test.responseCode) + res.Write([]byte(test.response)) + })) + defer testServer.Close() + + c := &ILMClient{ + Client: Client{ + Client: testServer.Client(), + Endpoint: testServer.URL, + BasicAuth: "foobar", + }, + } + result, err := c.Exists("jaeger-ilm-policy") + if test.errContains != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } + assert.Equal(t, test.expectedResult, result) + }) + } +} diff --git a/pkg/es/client/index_client.go b/pkg/es/client/index_client.go new file mode 100644 index 00000000000..0f853c5386f --- /dev/null +++ b/pkg/es/client/index_client.go @@ -0,0 +1,255 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" +) + +// Index represents ES index. +type Index struct { + // Index name. + Index string + // Index creation time. + CreationTime time.Time + // Aliases + Aliases map[string]bool +} + +// Alias represents ES alias. +type Alias struct { + // Index name. + Index string + // Alias name. + Name string + // IsWritedIndex option + IsWriteIndex bool +} + +var _ IndexAPI = (*IndicesClient)(nil) + +// IndicesClient is a client used to manipulate indices. +type IndicesClient struct { + Client + MasterTimeoutSeconds int +} + +// GetJaegerIndices queries all Jaeger indices including the archive and rollover. +// Jaeger daily indices are: +// jaeger-span-2019-01-01, jaeger-service-2019-01-01, jaeger-dependencies-2019-01-01 +// jaeger-span-archive +// Rollover indices: +// aliases: jaeger-span-read, jaeger-span-write, jaeger-service-read, jaeger-service-write +// indices: jaeger-span-000001, jaeger-service-000001 etc. +// aliases: jaeger-span-archive-read, jaeger-span-archive-write +// indices: jaeger-span-archive-000001 +func (i *IndicesClient) GetJaegerIndices(prefix string) ([]Index, error) { + prefix += "jaeger-*" + + body, err := i.request(elasticRequest{ + endpoint: fmt.Sprintf("%s?flat_settings=true&filter_path=*.aliases,*.settings", prefix), + method: http.MethodGet, + }) + + if err != nil { + return nil, fmt.Errorf("failed to query indices: %w", err) + } + + type indexInfo struct { + Aliases map[string]interface{} `json:"aliases"` + Settings map[string]string `json:"settings"` + } + var indicesInfo map[string]indexInfo + if err = json.Unmarshal(body, &indicesInfo); err != nil { + return nil, fmt.Errorf("failed to query indices and unmarshall response body: %q: %w", body, err) + } + + var indices []Index + for k, v := range indicesInfo { + aliases := map[string]bool{} + for alias := range v.Aliases { + aliases[alias] = true + } + // ignoring error, ES should return valid date + creationDate, _ := strconv.ParseInt(v.Settings["index.creation_date"], 10, 64) + + indices = append(indices, Index{ + Index: k, + CreationTime: time.Unix(0, int64(time.Millisecond)*creationDate), + Aliases: aliases, + }) + } + return indices, nil +} + +// DeleteIndices deletes specified set of indices. +func (i *IndicesClient) DeleteIndices(indices []Index) error { + concatIndices := "" + for _, i := range indices { + concatIndices += i.Index + concatIndices += "," + } + _, err := i.request(elasticRequest{ + endpoint: fmt.Sprintf("%s?master_timeout=%ds", concatIndices, i.MasterTimeoutSeconds), + method: http.MethodDelete, + }) + + if err != nil { + if responseError, isResponseError := err.(ResponseError); isResponseError { + if responseError.StatusCode != http.StatusOK { + return responseError.prefixMessage(fmt.Sprintf("failed to delete indices: %s", concatIndices)) + } + } + return fmt.Errorf("failed to delete indices: %w", err) + } + return nil +} + +// CreateIndex an ES index +func (i *IndicesClient) CreateIndex(index string) error { + _, err := i.request(elasticRequest{ + endpoint: index, + method: http.MethodPut, + }) + if err != nil { + if responseError, isResponseError := err.(ResponseError); isResponseError { + if responseError.StatusCode != http.StatusOK { + return responseError.prefixMessage(fmt.Sprintf("failed to create index: %s", index)) + } + } + return fmt.Errorf("failed to create index: %w", err) + } + return nil +} + +// CreateAlias an ES specific set of index aliases +func (i *IndicesClient) CreateAlias(aliases []Alias) error { + err := i.aliasAction("add", aliases) + if err != nil { + if responseError, isResponseError := err.(ResponseError); isResponseError { + if responseError.StatusCode != http.StatusOK { + return responseError.prefixMessage(fmt.Sprintf("failed to create aliases: %s", i.aliasesString(aliases))) + } + } + return fmt.Errorf("failed to create aliases: %w", err) + } + return nil +} + +// DeleteAlias an ES specific set of index aliases +func (i *IndicesClient) DeleteAlias(aliases []Alias) error { + err := i.aliasAction("remove", aliases) + if err != nil { + if responseError, isResponseError := err.(ResponseError); isResponseError { + if responseError.StatusCode != http.StatusOK { + return responseError.prefixMessage(fmt.Sprintf("failed to delete aliases: %s", i.aliasesString(aliases))) + } + } + return fmt.Errorf("failed to delete aliases: %w", err) + } + return nil +} + +func (i *IndicesClient) aliasesString(aliases []Alias) string { + concatAliases := "" + for _, alias := range aliases { + concatAliases += fmt.Sprintf("[index: %s, alias: %s],", alias.Index, alias.Name) + } + return strings.Trim(concatAliases, ",") +} + +func (i *IndicesClient) aliasAction(action string, aliases []Alias) error { + actions := []map[string]interface{}{} + + for _, alias := range aliases { + options := map[string]interface{}{ + "index": alias.Index, + "alias": alias.Name, + } + if alias.IsWriteIndex { + options["is_write_index"] = true + } + actions = append(actions, map[string]interface{}{ + action: options, + }) + } + + body := map[string]interface{}{ + "actions": actions, + } + + bodyBytes, err := json.Marshal(body) + if err != nil { + return err + } + _, err = i.request(elasticRequest{ + endpoint: "_aliases", + method: http.MethodPost, + body: bodyBytes, + }) + + return err +} + +// CreateTemplate an ES index template +func (i IndicesClient) CreateTemplate(template, name string) error { + _, err := i.request(elasticRequest{ + endpoint: fmt.Sprintf("_template/%s", name), + method: http.MethodPut, + body: []byte(template), + }) + if err != nil { + if responseError, isResponseError := err.(ResponseError); isResponseError { + if responseError.StatusCode != http.StatusOK { + return responseError.prefixMessage(fmt.Sprintf("failed to create template: %s", name)) + } + } + return fmt.Errorf("failed to create template: %w", err) + } + return nil +} + +// Rollover create a rollover for certain index/alias +func (i IndicesClient) Rollover(rolloverTarget string, conditions map[string]interface{}) error { + esReq := elasticRequest{ + endpoint: fmt.Sprintf("%s/_rollover/", rolloverTarget), + method: http.MethodPost, + } + if len(conditions) > 0 { + body := map[string]interface{}{ + "conditions": conditions, + } + bodyBytes, err := json.Marshal(body) + if err != nil { + return err + } + esReq.body = bodyBytes + } + _, err := i.request(esReq) + if err != nil { + if responseError, isResponseError := err.(ResponseError); isResponseError { + if responseError.StatusCode != http.StatusOK { + return responseError.prefixMessage(fmt.Sprintf("failed to create rollover target: %s", rolloverTarget)) + } + } + return fmt.Errorf("failed to create rollover: %w", err) + } + return nil +} diff --git a/pkg/es/client/index_client_test.go b/pkg/es/client/index_client_test.go new file mode 100644 index 00000000000..02921fba348 --- /dev/null +++ b/pkg/es/client/index_client_test.go @@ -0,0 +1,495 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "sort" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const esIndexResponse = ` +{ + "jaeger-service-2021-08-06" : { + "aliases" : { }, + "settings" : { + "index.creation_date" : "1628259381266", + "index.mapper.dynamic" : "false", + "index.mapping.nested_fields.limit" : "50", + "index.number_of_replicas" : "1", + "index.number_of_shards" : "5", + "index.provided_name" : "jaeger-service-2021-08-06", + "index.requests.cache.enable" : "true", + "index.uuid" : "2kKdvrvAT7qXetRzmWhjYQ", + "index.version.created" : "5061099" + } + }, + "jaeger-span-2021-08-06" : { + "aliases" : { }, + "settings" : { + "index.creation_date" : "1628259381326", + "index.mapper.dynamic" : "false", + "index.mapping.nested_fields.limit" : "50", + "index.number_of_replicas" : "1", + "index.number_of_shards" : "5", + "index.provided_name" : "jaeger-span-2021-08-06", + "index.requests.cache.enable" : "true", + "index.uuid" : "zySRY_FfRFa5YMWxNsNViA", + "index.version.created" : "5061099" + } + }, + "jaeger-span-000001" : { + "aliases" : { + "jaeger-span-read" : { }, + "jaeger-span-write" : { } + }, + "settings" : { + "index.creation_date" : "1628259381326" + } + } +}` + +const esErrResponse = `{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"request [/jaeger-*] contains unrecognized parameter: [help]"}],"type":"illegal_argument_exception","reason":"request [/jaeger-*] contains unrecognized parameter: [help]"},"status":400}` + +func TestClientGetIndices(t *testing.T) { + tests := []struct { + name string + responseCode int + response string + errContains string + indices []Index + }{ + { + name: "no error", + responseCode: http.StatusOK, + response: esIndexResponse, + indices: []Index{ + { + Index: "jaeger-service-2021-08-06", + CreationTime: time.Unix(0, int64(time.Millisecond)*1628259381266), + Aliases: map[string]bool{}, + }, + { + Index: "jaeger-span-000001", + CreationTime: time.Unix(0, int64(time.Millisecond)*1628259381326), + Aliases: map[string]bool{"jaeger-span-read": true, "jaeger-span-write": true}, + }, + { + Index: "jaeger-span-2021-08-06", + CreationTime: time.Unix(0, int64(time.Millisecond)*1628259381326), + Aliases: map[string]bool{}, + }, + }, + }, + { + name: "client error", + responseCode: http.StatusBadRequest, + response: esErrResponse, + errContains: "failed to query indices: request failed, status code: 400", + }, + { + name: "unmarshall error", + responseCode: http.StatusOK, + response: "AAA", + errContains: `failed to query indices and unmarshall response body: "AAA"`, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + res.WriteHeader(test.responseCode) + res.Write([]byte(test.response)) + })) + defer testServer.Close() + + c := &IndicesClient{ + Client: Client{ + Client: testServer.Client(), + Endpoint: testServer.URL, + }, + } + + indices, err := c.GetJaegerIndices("") + if test.errContains != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + assert.Nil(t, indices) + } else { + require.NoError(t, err) + sort.Slice(indices, func(i, j int) bool { + return strings.Compare(indices[i].Index, indices[j].Index) < 0 + }) + assert.Equal(t, test.indices, indices) + } + }) + } +} + +func TestClientDeleteIndices(t *testing.T) { + tests := []struct { + name string + responseCode int + response string + errContains string + }{ + { + name: "no error", + responseCode: http.StatusOK, + }, + { + name: "client error", + responseCode: http.StatusBadRequest, + response: esErrResponse, + errContains: "failed to delete indices: jaeger-span", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + assert.True(t, strings.Contains(req.URL.String(), "jaeger-span")) + assert.Equal(t, http.MethodDelete, req.Method) + assert.Equal(t, "Basic foobar", req.Header.Get("Authorization")) + res.WriteHeader(test.responseCode) + res.Write([]byte(test.response)) + })) + defer testServer.Close() + + c := &IndicesClient{ + Client: Client{ + Client: testServer.Client(), + Endpoint: testServer.URL, + BasicAuth: "foobar", + }, + } + + err := c.DeleteIndices([]Index{ + { + Index: "jaeger-span", + }, + }) + + if test.errContains != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } + }) + } +} + +func TestClientRequestError(t *testing.T) { + c := &IndicesClient{ + Client: Client{ + Endpoint: "%", + }, + } + err := c.DeleteIndices([]Index{}) + require.Error(t, err) + indices, err := c.GetJaegerIndices("") + require.Error(t, err) + assert.Nil(t, indices) +} + +func TestClientDoError(t *testing.T) { + c := &IndicesClient{ + Client: Client{ + Client: &http.Client{}, + Endpoint: "localhost:1", + }, + } + err := c.DeleteIndices([]Index{}) + require.Error(t, err) + indices, err := c.GetJaegerIndices("") + require.Error(t, err) + assert.Nil(t, indices) +} + +func TestClientCreateIndex(t *testing.T) { + indexName := "jaeger-span" + tests := []struct { + name string + responseCode int + response string + errContains string + }{ + { + name: "success", + responseCode: http.StatusOK, + }, + { + name: "client error", + responseCode: http.StatusBadRequest, + response: esErrResponse, + errContains: "failed to create index: jaeger-span", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + assert.True(t, strings.HasSuffix(req.URL.String(), "jaeger-span")) + assert.Equal(t, http.MethodPut, req.Method) + assert.Equal(t, "Basic foobar", req.Header.Get("Authorization")) + res.WriteHeader(test.responseCode) + res.Write([]byte(test.response)) + })) + defer testServer.Close() + + c := &IndicesClient{ + Client: Client{ + Client: testServer.Client(), + Endpoint: testServer.URL, + BasicAuth: "foobar", + }, + } + err := c.CreateIndex(indexName) + if test.errContains != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } + }) + } +} + +func TestClientCreateAliases(t *testing.T) { + aliases := []Alias{ + { + Index: "jaeger-span", + Name: "jaeger-span-read", + }, + { + Index: "jaeger-span", + Name: "jaeger-span-write", + IsWriteIndex: true, + }, + } + expectedRequestBody := `{"actions":[{"add":{"alias":"jaeger-span-read","index":"jaeger-span"}},{"add":{"alias":"jaeger-span-write","index":"jaeger-span","is_write_index":true}}]}` + tests := []struct { + name string + responseCode int + response string + errContains string + }{ + { + name: "success", + responseCode: http.StatusOK, + }, + { + name: "client error", + responseCode: http.StatusBadRequest, + response: esErrResponse, + errContains: "failed to create aliases: [index: jaeger-span, alias: jaeger-span-read],[index: jaeger-span, alias: jaeger-span-write]", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + assert.True(t, strings.HasSuffix(req.URL.String(), "_aliases")) + assert.Equal(t, http.MethodPost, req.Method) + assert.Equal(t, "Basic foobar", req.Header.Get("Authorization")) + body, err := ioutil.ReadAll(req.Body) + require.NoError(t, err) + assert.Equal(t, expectedRequestBody, string(body)) + res.WriteHeader(test.responseCode) + res.Write([]byte(test.response)) + })) + defer testServer.Close() + + c := &IndicesClient{ + Client: Client{ + Client: testServer.Client(), + Endpoint: testServer.URL, + BasicAuth: "foobar", + }, + } + err := c.CreateAlias(aliases) + if test.errContains != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } + }) + } +} + +func TestClientDeleteAliases(t *testing.T) { + aliases := []Alias{ + { + Index: "jaeger-span", + Name: "jaeger-span-read", + }, + { + Index: "jaeger-span", + Name: "jaeger-span-write", + IsWriteIndex: true, + }, + } + expectedRequestBody := `{"actions":[{"remove":{"alias":"jaeger-span-read","index":"jaeger-span"}},{"remove":{"alias":"jaeger-span-write","index":"jaeger-span","is_write_index":true}}]}` + tests := []struct { + name string + responseCode int + response string + errContains string + }{ + { + name: "success", + responseCode: http.StatusOK, + }, + { + name: "client error", + responseCode: http.StatusBadRequest, + response: esErrResponse, + errContains: "failed to delete aliases: [index: jaeger-span, alias: jaeger-span-read],[index: jaeger-span, alias: jaeger-span-write]", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + assert.True(t, strings.HasSuffix(req.URL.String(), "_aliases")) + assert.Equal(t, http.MethodPost, req.Method) + assert.Equal(t, "Basic foobar", req.Header.Get("Authorization")) + body, err := ioutil.ReadAll(req.Body) + require.NoError(t, err) + assert.Equal(t, expectedRequestBody, string(body)) + res.WriteHeader(test.responseCode) + res.Write([]byte(test.response)) + })) + defer testServer.Close() + + c := &IndicesClient{ + Client: Client{ + Client: testServer.Client(), + Endpoint: testServer.URL, + BasicAuth: "foobar", + }, + } + err := c.DeleteAlias(aliases) + if test.errContains != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } + }) + } +} + +func TestClientCreateTemplate(t *testing.T) { + templateName := "jaeger-template" + templateContent := "template content" + tests := []struct { + name string + responseCode int + response string + errContains string + }{ + { + name: "success", + responseCode: http.StatusOK, + }, + { + name: "client error", + responseCode: http.StatusBadRequest, + response: esErrResponse, + errContains: "failed to create template: jaeger-template", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + assert.True(t, strings.HasSuffix(req.URL.String(), "_template/jaeger-template")) + assert.Equal(t, http.MethodPut, req.Method) + assert.Equal(t, "Basic foobar", req.Header.Get("Authorization")) + body, err := ioutil.ReadAll(req.Body) + require.NoError(t, err) + assert.Equal(t, templateContent, string(body)) + + res.WriteHeader(test.responseCode) + res.Write([]byte(test.response)) + })) + defer testServer.Close() + + c := &IndicesClient{ + Client: Client{ + Client: testServer.Client(), + Endpoint: testServer.URL, + BasicAuth: "foobar", + }, + } + err := c.CreateTemplate(templateContent, templateName) + if test.errContains != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } + }) + } +} + +func TestRollover(t *testing.T) { + expectedRequestBody := "{\"conditions\":{\"max_age\":\"2d\"}}" + mapConditions := map[string]interface{}{ + "max_age": "2d", + } + + tests := []struct { + name string + responseCode int + response string + errContains string + }{ + { + name: "success", + responseCode: http.StatusOK, + }, + { + name: "client error", + responseCode: http.StatusBadRequest, + response: esErrResponse, + errContains: "failed to create rollover target: jaeger-span", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) { + assert.True(t, strings.HasSuffix(req.URL.String(), "jaeger-span/_rollover/")) + assert.Equal(t, http.MethodPost, req.Method) + assert.Equal(t, "Basic foobar", req.Header.Get("Authorization")) + body, err := ioutil.ReadAll(req.Body) + require.NoError(t, err) + assert.Equal(t, expectedRequestBody, string(body)) + + res.WriteHeader(test.responseCode) + res.Write([]byte(test.response)) + })) + defer testServer.Close() + + c := &IndicesClient{ + Client: Client{ + Client: testServer.Client(), + Endpoint: testServer.URL, + BasicAuth: "foobar", + }, + } + err := c.Rollover("jaeger-span", mapConditions) + if test.errContains != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), test.errContains) + } + }) + } +} diff --git a/pkg/es/client/interfaces.go b/pkg/es/client/interfaces.go new file mode 100644 index 00000000000..9ea6fcff31f --- /dev/null +++ b/pkg/es/client/interfaces.go @@ -0,0 +1,33 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +type IndexAPI interface { + GetJaegerIndices(prefix string) ([]Index, error) + DeleteIndices(indices []Index) error + CreateIndex(index string) error + CreateAlias(aliases []Alias) error + DeleteAlias(aliases []Alias) error + CreateTemplate(template, name string) error + Rollover(rolloverTarget string, conditions map[string]interface{}) error +} + +type ClusterAPI interface { + Version() (uint, error) +} + +type IndexManagementLifecycleAPI interface { + Exists(name string) (bool, error) +} diff --git a/pkg/es/client/mocks/cluter_client.go b/pkg/es/client/mocks/cluter_client.go new file mode 100644 index 00000000000..37137b8ffde --- /dev/null +++ b/pkg/es/client/mocks/cluter_client.go @@ -0,0 +1,28 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import ( + "github.com/stretchr/testify/mock" +) + +type MockClusterAPI struct { + mock.Mock +} + +func (c *MockClusterAPI) Version() (uint, error) { + ret := c.Called() + return ret.Get(0).(uint), ret.Error(1) +} diff --git a/pkg/es/client/mocks/ilm_client.go b/pkg/es/client/mocks/ilm_client.go new file mode 100644 index 00000000000..0602f13a1bc --- /dev/null +++ b/pkg/es/client/mocks/ilm_client.go @@ -0,0 +1,26 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import "github.com/stretchr/testify/mock" + +type MockILMAPI struct { + mock.Mock +} + +func (c *MockILMAPI) Exists(name string) (bool, error) { + ret := c.Called(name) + return ret.Get(0).(bool), ret.Error(1) +} diff --git a/pkg/es/client/mocks/index_client.go b/pkg/es/client/mocks/index_client.go new file mode 100644 index 00000000000..1382b3e7f89 --- /dev/null +++ b/pkg/es/client/mocks/index_client.go @@ -0,0 +1,54 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocks + +import ( + "github.com/stretchr/testify/mock" + + "github.com/jaegertracing/jaeger/pkg/es/client" +) + +type MockIndexAPI struct { + mock.Mock +} + +func (c *MockIndexAPI) GetJaegerIndices(prefix string) ([]client.Index, error) { + ret := c.Called(prefix) + return ret.Get(0).([]client.Index), ret.Error(1) +} +func (c *MockIndexAPI) DeleteIndices(indices []client.Index) error { + ret := c.Called(indices) + return ret.Error(0) +} +func (c *MockIndexAPI) CreateIndex(index string) error { + ret := c.Called(index) + return ret.Error(0) +} +func (c *MockIndexAPI) CreateAlias(aliases []client.Alias) error { + ret := c.Called(aliases) + return ret.Error(0) +} +func (c *MockIndexAPI) DeleteAlias(aliases []client.Alias) error { + ret := c.Called(aliases) + return ret.Error(0) +} +func (c *MockIndexAPI) CreateTemplate(template, name string) error { + ret := c.Called(template, name) + return ret.Error(0) +} +func (c *MockIndexAPI) Rollover(rolloverTarget string, conditions map[string]interface{}) error { + ret := c.Called(rolloverTarget, conditions) + return ret.Error(0) +} diff --git a/pkg/es/filter/alias.go b/pkg/es/filter/alias.go new file mode 100644 index 00000000000..631271f784e --- /dev/null +++ b/pkg/es/filter/alias.go @@ -0,0 +1,70 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filter + +import ( + "github.com/jaegertracing/jaeger/pkg/es/client" +) + +// AliasExists check if the some of indices has a certain alias name +func AliasExists(indices []client.Index, aliasName string) bool { + aliases := ByAlias(indices, []string{aliasName}) + return len(aliases) > 0 +} + +// ByAlias filter indices that have an alias in the array of alias +func ByAlias(indices []client.Index, aliases []string) []client.Index { + return filterByAliasWithOptions(indices, aliases, false) +} + +// ByAliasExclude filter indices that doesn't have an alias in the array of alias +func ByAliasExclude(indices []client.Index, aliases []string) []client.Index { + return filterByAliasWithOptions(indices, aliases, true) +} + +func filterByAliasWithOptions(indices []client.Index, aliases []string, exclude bool) []client.Index { + var results []client.Index + for _, alias := range aliases { + for _, index := range indices { + hasAlias := index.Aliases[alias] + if hasAlias { + results = append(results, index) + } + } + } + if exclude { + return exlude(indices, results) + } + return results +} + +func exlude(indices []client.Index, exclusionList []client.Index) []client.Index { + excludedIndices := make([]client.Index, 0, len(indices)) + for _, idx := range indices { + if !contains(idx, exclusionList) { + excludedIndices = append(excludedIndices, idx) + } + } + return excludedIndices +} + +func contains(index client.Index, indexList []client.Index) bool { + for _, idx := range indexList { + if idx.Index == index.Index { + return true + } + } + return false +} diff --git a/pkg/es/filter/alias_test.go b/pkg/es/filter/alias_test.go new file mode 100644 index 00000000000..614c3c0fac0 --- /dev/null +++ b/pkg/es/filter/alias_test.go @@ -0,0 +1,116 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filter + +import ( + "testing" + + "github.com/crossdock/crossdock-go/assert" + + "github.com/jaegertracing/jaeger/pkg/es/client" +) + +var indices = []client.Index{ + { + Index: "jaeger-span-0001", + Aliases: map[string]bool{ + "jaeger-span-write": true, + "jaeger-span-read": true, + }, + }, + { + Index: "jaeger-span-0002", + Aliases: map[string]bool{ + "jaeger-span-read": true, + }, + }, + { + Index: "jaeger-span-0003", + Aliases: map[string]bool{ + "jaeger-span-read": true, + }, + }, + { + Index: "jaeger-span-0004", + Aliases: map[string]bool{ + "jaeger-span-other": true, + }, + }, + { + Index: "jaeger-span-0005", + Aliases: map[string]bool{ + "custom-alias": true, + }, + }, + { + Index: "jaeger-span-0006", + }, +} + +func TestByAlias(t *testing.T) { + filtered := ByAlias(indices, []string{"jaeger-span-read", "jaeger-span-other"}) + expected := []client.Index{ + { + Index: "jaeger-span-0001", + Aliases: map[string]bool{ + "jaeger-span-write": true, + "jaeger-span-read": true, + }, + }, + { + Index: "jaeger-span-0002", + Aliases: map[string]bool{ + "jaeger-span-read": true, + }, + }, + { + Index: "jaeger-span-0003", + Aliases: map[string]bool{ + "jaeger-span-read": true, + }, + }, + { + Index: "jaeger-span-0004", + Aliases: map[string]bool{ + "jaeger-span-other": true, + }, + }, + } + assert.Equal(t, expected, filtered) +} + +func TestByAliasExclude(t *testing.T) { + filtered := ByAliasExclude(indices, []string{"jaeger-span-read", "jaeger-span-other"}) + expected := []client.Index{ + { + Index: "jaeger-span-0005", + Aliases: map[string]bool{ + "custom-alias": true, + }, + }, + { + Index: "jaeger-span-0006", + }, + } + assert.Equal(t, expected, filtered) +} + +func TestHasAliasEmpty(t *testing.T) { + result := AliasExists(indices, "my-unexisting-alias") + assert.False(t, result) + + result = AliasExists(indices, "custom-alias") + assert.True(t, result) +} diff --git a/pkg/es/filter/date.go b/pkg/es/filter/date.go new file mode 100644 index 00000000000..b7f67531e29 --- /dev/null +++ b/pkg/es/filter/date.go @@ -0,0 +1,32 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filter + +import ( + "time" + + "github.com/jaegertracing/jaeger/pkg/es/client" +) + +// ByDate filter indices by creationTime, return indices that were created before certain date. +func ByDate(indices []client.Index, beforeThisDate time.Time) []client.Index { + var filtered []client.Index + for _, in := range indices { + if in.CreationTime.Before(beforeThisDate) { + filtered = append(filtered, in) + } + } + return filtered +} diff --git a/pkg/es/filter/date_test.go b/pkg/es/filter/date_test.go new file mode 100644 index 00000000000..946aa1b69ef --- /dev/null +++ b/pkg/es/filter/date_test.go @@ -0,0 +1,93 @@ +// Copyright (c) 2021 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package filter + +import ( + "testing" + "time" + + "github.com/crossdock/crossdock-go/assert" + + "github.com/jaegertracing/jaeger/pkg/es/client" +) + +func TestByDate(t *testing.T) { + beforeDateFilter := time.Date(2021, 10, 10, 12, 00, 00, 00, time.Local) + expectedIndices := []client.Index{ + { + Index: "jaeger-span-0006", + CreationTime: time.Date(2021, 7, 7, 7, 10, 10, 10, time.Local), + }, + { + Index: "jaeger-span-0004", + CreationTime: time.Date(2021, 9, 16, 11, 00, 00, 00, time.Local), + Aliases: map[string]bool{ + "jaeger-span-other": true, + }, + }, + { + Index: "jaeger-span-0005", + CreationTime: time.Date(2021, 10, 10, 9, 56, 34, 25, time.Local), + Aliases: map[string]bool{ + "custom-alias": true, + }, + }, + } + indices := []client.Index{ + { + Index: "jaeger-span-0006", + CreationTime: time.Date(2021, 7, 7, 7, 10, 10, 10, time.Local), + }, + { + Index: "jaeger-span-0004", + CreationTime: time.Date(2021, 9, 16, 11, 00, 00, 00, time.Local), + Aliases: map[string]bool{ + "jaeger-span-other": true, + }, + }, + { + Index: "jaeger-span-0005", + CreationTime: time.Date(2021, 10, 10, 9, 56, 34, 25, time.Local), + Aliases: map[string]bool{ + "custom-alias": true, + }, + }, + { + Index: "jaeger-span-0001", + CreationTime: time.Date(2021, 10, 10, 12, 00, 00, 00, time.Local), + Aliases: map[string]bool{ + "jaeger-span-write": true, + "jaeger-span-read": true, + }, + }, + { + Index: "jaeger-span-0002", + CreationTime: time.Date(2021, 11, 10, 12, 30, 00, 00, time.Local), + Aliases: map[string]bool{ + "jaeger-span-read": true, + }, + }, + { + Index: "jaeger-span-0003", + CreationTime: time.Date(2021, 12, 10, 2, 15, 20, 01, time.Local), + Aliases: map[string]bool{ + "jaeger-span-read": true, + }, + }, + } + + result := ByDate(indices, beforeDateFilter) + assert.Equal(t, expectedIndices, result) +}