From 767b0d12ee5da703e414a797815a4932031372ef Mon Sep 17 00:00:00 2001 From: Timo Reimann Date: Tue, 23 May 2017 02:10:22 +0200 Subject: [PATCH] Implement and expose ready state. During deployments (only), we mark applications as ready depending on the outcome of potentially configured readiness checks. The change goes along with some major refactoring, primarily focused towards retrieving all necessary state from a single Marathon API request thanks to the 'embed=apps.{tasks,deployments,readiness}' query parameter. This is necessary in order to retrieve a single, consistent app/task state not skewed by two API requests send away at slightly different offsets. Additionally, we stop considering tasks as ready which have not yet reached the TASK_RUNNING state since still staging or otherwise non-ready tasks are bound to fail if taken into load-balancing rotation prematurely. --- services/marathon/marathon.go | 258 ++++++++++++--------- services/marathon/marathon_test.go | 357 ++++++++++++++++++++++++++++- 2 files changed, 502 insertions(+), 113 deletions(-) diff --git a/services/marathon/marathon.go b/services/marathon/marathon.go index 459934d..c6e9216 100644 --- a/services/marathon/marathon.go +++ b/services/marathon/marathon.go @@ -2,13 +2,17 @@ package marathon import ( "encoding/json" - "github.com/QubitProducts/bamboo/configuration" "io/ioutil" + "log" "net/http" "sort" "strings" + + "github.com/QubitProducts/bamboo/configuration" ) +const taskStateRunning = "TASK_RUNNING" + // Describes an app process running type Task struct { Id string @@ -16,6 +20,8 @@ type Task struct { Port int Ports []int Alive bool + State string + Ready bool } // A health check on the application @@ -36,6 +42,7 @@ type App struct { HealthCheckPath string HealthCheckProtocol string HealthChecks []HealthCheck + ReadinessCheckPath string Tasks []Task ServicePort int ServicePorts []int @@ -58,12 +65,6 @@ func (slice AppList) Swap(i, j int) { slice[i], slice[j] = slice[j], slice[i] } -type marathonTaskList []marathonTask - -type marathonTasks struct { - Tasks marathonTaskList `json:"tasks"` -} - type HealthCheckResult struct { Alive bool } @@ -74,12 +75,15 @@ type marathonTask struct { Host string Ports []int ServicePorts []int + State string StartedAt string StagedAt string Version string HealthCheckResults []HealthCheckResult } +type marathonTaskList []marathonTask + func (slice marathonTaskList) Len() int { return len(slice) } @@ -97,11 +101,15 @@ type marathonApps struct { } type marathonApp struct { - Id string `json:"id"` - HealthChecks []marathonHealthCheck `json:"healthChecks"` - Ports []int `json:"ports"` - Env map[string]string `json:"env"` - Labels map[string]string `json:"labels"` + Id string `json:"id"` + HealthChecks []marathonHealthCheck `json:"healthChecks"` + Ports []int `json:"ports"` + Env map[string]string `json:"env"` + Labels map[string]string `json:"labels"` + Deployments []deployment `json:"deployments"` + Tasks marathonTaskList `json:"tasks"` + ReadinessChecks []marathonReadinessCheck `json:"readinessChecks"` + ReadinessCheckResults []readinessCheckResult `json:"readinessCheckResults"` } type marathonHealthCheck struct { @@ -110,110 +118,92 @@ type marathonHealthCheck struct { PortIndex int `json:"portIndex"` } -func fetchMarathonApps(endpoint string, conf *configuration.Configuration) (map[string]marathonApp, error) { - client := &http.Client{} - req, _ := http.NewRequest("GET", endpoint+"/v2/apps", nil) - req.Header.Add("Accept", "application/json") - req.Header.Add("Content-Type", "application/json") - if len(conf.Marathon.User) > 0 && len(conf.Marathon.Password) > 0 { - req.SetBasicAuth(conf.Marathon.User, conf.Marathon.Password) - } - response, err := client.Do(req) +type marathonReadinessCheck struct { + Path string `json:"path"` +} - if err != nil { - return nil, err - } +type deployment struct { + ID string `json:"id"` +} - defer response.Body.Close() - var appResponse marathonApps +type readinessCheckResult struct { + TaskID string `json:"taskId"` + Ready bool `json:"ready"` +} - contents, err := ioutil.ReadAll(response.Body) - if err != nil { - return nil, err - } +/* + Apps returns a struct that describes Marathon current app and their + sub tasks information. - err = json.Unmarshal(contents, &appResponse) - if err != nil { - return nil, err - } + Parameters: + endpoint: Marathon HTTP endpoint, e.g. http://localhost:8080 +*/ +func FetchApps(maraconf configuration.Marathon, conf *configuration.Configuration) (AppList, error) { + var marathonApps []marathonApp + var err error - dataById := map[string]marathonApp{} + // Try all configured endpoints until one succeeds or we exhaust the list, + // whichever comes first. + for _, url := range maraconf.Endpoints() { + marathonApps, err = fetchMarathonApps(url, conf) + if err == nil { + for _, marathonApp := range marathonApps { + sort.Sort(marathonApp.Tasks) + } + apps := createApps(marathonApps) + sort.Sort(apps) + return apps, nil + } + } + // return last error + return nil, err +} - for _, appConfig := range appResponse.Apps { - dataById[appConfig.Id] = appConfig +func fetchMarathonApps(endpoint string, conf *configuration.Configuration) ([]marathonApp, error) { + var appResponse marathonApps + if err := parseJSON(endpoint+"/v2/apps?embed=app.tasks&embed=app.deployments&embed=app.readiness", conf, &appResponse); err != nil { + return nil, err } - return dataById, nil + return appResponse.Apps, nil } -func fetchTasks(endpoint string, conf *configuration.Configuration) (map[string]marathonTaskList, error) { +func parseJSON(url string, conf *configuration.Configuration, out interface{}) error { client := &http.Client{} - req, _ := http.NewRequest("GET", endpoint+"/v2/tasks", nil) + req, _ := http.NewRequest("GET", url, nil) req.Header.Add("Accept", "application/json") req.Header.Add("Content-Type", "application/json") if len(conf.Marathon.User) > 0 && len(conf.Marathon.Password) > 0 { req.SetBasicAuth(conf.Marathon.User, conf.Marathon.Password) } - response, err := client.Do(req) - - var tasks marathonTasks + response, err := client.Do(req) if err != nil { - return nil, err + return err } - contents, err := ioutil.ReadAll(response.Body) defer response.Body.Close() - if err != nil { - return nil, err - } - err = json.Unmarshal(contents, &tasks) + contents, err := ioutil.ReadAll(response.Body) if err != nil { - return nil, err - } - - taskList := tasks.Tasks - sort.Sort(taskList) - - tasksById := map[string]marathonTaskList{} - for _, task := range taskList { - if tasksById[task.AppId] == nil { - tasksById[task.AppId] = marathonTaskList{} - } - tasksById[task.AppId] = append(tasksById[task.AppId], task) + return err } - for _, task_list := range tasksById { - sort.Sort(task_list) + err = json.Unmarshal(contents, &out) + if err != nil { + return err } - return tasksById, nil + return nil } -func calculateTaskHealth(healthCheckResults []HealthCheckResult, healthChecks []marathonHealthCheck) bool { - //If we don't even have health check results for every health check, don't count the task as healthy - if len(healthChecks) > len(healthCheckResults) { - return false - } - for _, healthCheck := range healthCheckResults { - if !healthCheck.Alive { - return false - } - } - return true -} - -func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]marathonApp) AppList { +func createApps(marathonApps []marathonApp) AppList { apps := AppList{} - for appId, mApp := range marathonApps { - + for _, mApp := range marathonApps { + appId := mApp.Id // Try to handle old app id format without slashes - appPath := appId - if !strings.HasPrefix(appId, "/") { - appPath = "/" + appId - } + appPath := "/" + strings.TrimPrefix(mApp.Id, "/") // build App from marathonApp app := App{ @@ -222,6 +212,7 @@ func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]m EscapedId: strings.Replace(appId, "/", "::", -1), HealthCheckPath: parseHealthCheckPath(mApp.HealthChecks), HealthCheckProtocol: parseHealthCheckProtocol(mApp.HealthChecks), + ReadinessCheckPath: parseReadinessCheckPath(mApp.ReadinessChecks), Env: mApp.Env, Labels: mApp.Labels, SplitId: strings.Split(appId, "/"), @@ -244,7 +235,7 @@ func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]m // build Tasks for this App tasks := []Task{} - for _, mTask := range tasksById[appId] { + for _, mTask := range mApp.Tasks { if len(mTask.Ports) > 0 { t := Task{ Id: mTask.Id, @@ -252,6 +243,8 @@ func createApps(tasksById map[string]marathonTaskList, marathonApps map[string]m Port: mTask.Ports[0], Ports: mTask.Ports, Alive: calculateTaskHealth(mTask.HealthCheckResults, mApp.HealthChecks), + State: mTask.State, + Ready: calculateReadiness(mTask, mApp), } tasks = append(tasks, t) } @@ -297,41 +290,82 @@ func parseHealthCheckProtocol(checks []marathonHealthCheck) string { return "" } -/* - Apps returns a struct that describes Marathon current app and their - sub tasks information. - - Parameters: - endpoint: Marathon HTTP endpoint, e.g. http://localhost:8080 -*/ -func FetchApps(maraconf configuration.Marathon, conf *configuration.Configuration) (AppList, error) { +func parseReadinessCheckPath(checks []marathonReadinessCheck) string { + if len(checks) > 0 { + return checks[0].Path + } - var applist AppList - var err error + return "" +} - // try all configured endpoints until one succeeds - for _, url := range maraconf.Endpoints() { - applist, err = _fetchApps(url, conf) - if err == nil { - return applist, err +func calculateTaskHealth(healthCheckResults []HealthCheckResult, healthChecks []marathonHealthCheck) bool { + // If we don't even have health check results for every health check, don't + // count the task as healthy. + if len(healthChecks) > len(healthCheckResults) { + return false + } + for _, healthCheck := range healthCheckResults { + if !healthCheck.Alive { + return false } } - // return last error - return nil, err + return true } -func _fetchApps(url string, conf *configuration.Configuration) (AppList, error) { - tasks, err := fetchTasks(url, conf) - if err != nil { - return nil, err +func calculateReadiness(task marathonTask, maraApp marathonApp) bool { + switch { + case task.State != taskStateRunning: + // By definition, a task not running cannot be ready. + log.Printf("task %s app %s: ready = false [task state %s != required state %s]", task.Id, maraApp.Id, task.State, taskStateRunning) + return false + + case len(maraApp.Deployments) == 0: + // We only care about readiness during deployments; post-deployment readiness + // should be covered by a separate HAProxy health check definition. + log.Printf("task %s app %s: ready = true [no deployment ongoing]", task.Id, maraApp.Id) + return true + + case len(maraApp.ReadinessChecks) == 0: + // Applications without configured readiness checks are always considered + // ready. + log.Printf("task %s app %s: ready = true [no readiness checks on app]", task.Id, maraApp.Id) + return true } - marathonApps, err := fetchMarathonApps(url, conf) - if err != nil { - return nil, err + // Loop through all readiness check results and return the results for + // matching task IDs. + for _, readinessCheckResult := range maraApp.ReadinessCheckResults { + if readinessCheckResult.TaskID == task.Id { + log.Printf("task %s app %s: ready = %t [evaluating readiness check ready state]", task.Id, maraApp.Id, readinessCheckResult.Ready) + return readinessCheckResult.Ready + } } - apps := createApps(tasks, marathonApps) - sort.Sort(apps) - return apps, nil + // There's a corner case sometimes hit where the first new task of a + // deployment goes from TASK_STAGING to TASK_RUNNING without a corresponding + // health check result being included in the API response. This only happens + // in a very short (yet unlucky) time frame and does not repeat for subsequent + // tasks of the same deployment. + // We identify this situation by checking that we are looking at a part of the + // deployment representing a new task (i.e., it has the most recent version + // timestamp while other timestamps exist as well). If that's the case, we + // err on the side of caution and mark it as non-ready. + versions := map[string]bool{} + var maxVersion string + for _, task := range maraApp.Tasks { + versions[task.Version] = true + if maxVersion == "" || maxVersion < task.Version { + maxVersion = task.Version + } + } + if len(versions) > 1 && task.Version == maxVersion { + log.Printf("task %s app %s: ready = false [new task with version %s not included in readiness check results yet]", task.Id, maraApp.Id, maxVersion) + return false + } + + // Finally, we can be certain this task is not part of the deployment (i.e., + // it's an old task that's going to transition into the TASK_KILLING and/or + // TASK_KILLED state as new tasks' readiness checks gradually turn green.) + log.Printf("task %s app %s: ready = true [task not involved in deployment]", task.Id, maraApp.Id) + return true } diff --git a/services/marathon/marathon_test.go b/services/marathon/marathon_test.go index b29040d..cefc5fe 100644 --- a/services/marathon/marathon_test.go +++ b/services/marathon/marathon_test.go @@ -1,8 +1,14 @@ package marathon import ( - . "github.com/smartystreets/goconvey/convey" + "fmt" "testing" + + "net/http" + "net/http/httptest" + + "github.com/QubitProducts/bamboo/configuration" + . "github.com/smartystreets/goconvey/convey" ) func TestGetMesosDnsId_Simple(t *testing.T) { @@ -63,3 +69,352 @@ func TestParseHealthCheckPathMixed(t *testing.T) { }) }) } + +func TestParseJSONRequest(t *testing.T) { + tests := []struct { + user string + password string + wantBasicAuth bool + }{ + { + wantBasicAuth: false, + }, + { + user: "user", + wantBasicAuth: false, + }, + { + password: "password", + wantBasicAuth: false, + }, + { + user: "user", + password: "password", + wantBasicAuth: true, + }, + } + + for _, test := range tests { + test := test + t.Run(fmt.Sprintf("user='%s' password='%s'", test.user, test.password), func(t *testing.T) { + t.Parallel() + conf := configuration.Configuration{ + Marathon: configuration.Marathon{ + User: test.user, + Password: test.password, + }, + } + + var req *http.Request + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + req = r + fmt.Fprint(w, "{}") + })) + defer ts.Close() + + var res interface{} + err := parseJSON(ts.URL, &conf, &res) + if err != nil { + t.Fatalf("parseJSON returned error: %s", err) + } + + if req.Method != http.MethodGet { + t.Errorf("got method '%s', want '%s'", req.Method, http.MethodGet) + } + + for _, hdrKey := range []string{"Accept", "Content-Type"} { + hdrValue := req.Header.Get(hdrKey) + switch { + case hdrValue == "": + t.Errorf("%s header missing", hdrKey) + case hdrValue != "application/json": + t.Errorf("got %s header value '%s', want 'application/json'", hdrKey, hdrValue) + } + } + + authHdrValue := req.Header.Get("Authorization") + if test.wantBasicAuth != (authHdrValue != "") { + t.Errorf("got Authorization header value '%s', wanted header: %t", authHdrValue, test.wantBasicAuth) + } + }) + } +} + +func TestParseJSONHandling(t *testing.T) { + tests := []struct { + desc string + handler http.Handler + shouldSucceed bool + }{ + { + desc: "request failed", + shouldSucceed: false, + }, + { + desc: "invalid JSON", + handler: http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprint(w, "{") + }), + shouldSucceed: false, + }, + } + + for _, test := range tests { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + var endpoint string + if test.handler != nil { + ts := httptest.NewServer(test.handler) + defer ts.Close() + endpoint = ts.URL + } + + conf := configuration.Configuration{} + var res interface{} + err := parseJSON(endpoint, &conf, res) + + if test.shouldSucceed != (err == nil) { + t.Errorf("got error '%s', wanted error: %t", err, !test.shouldSucceed) + } + }) + } +} + +func TestFetchApps(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + fmt.Fprintf(w, `{ + "apps": [ + { + "id": "/app2WithSlash", + "tasks": [ + { + "id": "task2", + "ports": [8002] + }, + { + "id": "task1", + "ports": [8001] + } + ] + }, + { + "id": "app1WithoutSlash", + "tasks": [ + { + "id": "task1", + "ports": [8001] + }, + { + "id": "task2", + "ports": [8002] + } + ] + } + ] +}`) + })) + defer ts.Close() + + // First Marathon URL is invalid to verify failover behavior. + maraConf := configuration.Marathon{ + Endpoint: fmt.Sprintf("http://127.0.0.1:4242,%s", ts.URL), + } + + apps, err := FetchApps(maraConf, &configuration.Configuration{}) + + if err != nil { + t.Fatalf("FetchApps returned error: %s", err) + } + + if len(apps) < 1 { + t.Fatal("no apps fetched") + } + assertFetchedApp(t, 1, "/app1WithoutSlash", apps[0]) + + if len(apps) < 2 { + t.Fatal("missing second app") + } + assertFetchedApp(t, 2, "/app2WithSlash", apps[1]) + + if len(apps) > 2 { + t.Fatalf("got %d apps, want 2", len(apps)) + } +} + +func TestCalculateReadiness(t *testing.T) { + tests := []struct { + desc string + task marathonTask + app marathonApp + wantReady bool + }{ + { + desc: "non-running task", + task: marathonTask{ + State: "TASK_STAGED", + }, + wantReady: false, + }, + { + desc: "no deployment running for app", + task: marathonTask{ + State: taskStateRunning, + }, + app: marathonApp{ + Deployments: []deployment{}, + }, + wantReady: true, + }, + { + desc: "no readiness checks defined for app", + task: marathonTask{ + State: taskStateRunning, + }, + app: marathonApp{ + Deployments: []deployment{ + deployment{ID: "deploymentId"}, + }, + ReadinessChecks: []marathonReadinessCheck{}, + }, + wantReady: true, + }, + { + desc: "readiness check result negative", + task: marathonTask{ + Id: "taskId", + State: taskStateRunning, + }, + app: marathonApp{ + Deployments: []deployment{ + deployment{ID: "deploymentId"}, + }, + ReadinessChecks: []marathonReadinessCheck{ + marathonReadinessCheck{ + Path: "/ready", + }, + }, + ReadinessCheckResults: []readinessCheckResult{ + readinessCheckResult{ + Ready: false, + TaskID: "taskId", + }, + }, + }, + wantReady: false, + }, + { + desc: "readiness check result positive", + task: marathonTask{ + Id: "taskId", + State: taskStateRunning, + }, + app: marathonApp{ + Deployments: []deployment{ + deployment{ID: "deploymentId"}, + }, + ReadinessChecks: []marathonReadinessCheck{ + marathonReadinessCheck{ + Path: "/ready", + }, + }, + ReadinessCheckResults: []readinessCheckResult{ + readinessCheckResult{ + Ready: false, + TaskID: "otherTaskId", + }, + readinessCheckResult{ + Ready: true, + TaskID: "taskId", + }, + }, + }, + wantReady: true, + }, + { + desc: "ready task's readiness check result outstanding", + task: marathonTask{ + Id: "newTaskId", + State: taskStateRunning, + Version: "2017-01-15T00:00:00.000Z", + }, + app: marathonApp{ + Deployments: []deployment{ + deployment{ID: "deploymentId"}, + }, + ReadinessChecks: []marathonReadinessCheck{ + marathonReadinessCheck{ + Path: "/ready", + }, + }, + ReadinessCheckResults: []readinessCheckResult{}, + Tasks: marathonTaskList{ + marathonTask{ + Id: "newTaskId", + Version: "2017-01-15T00:00:00.000Z", + }, + marathonTask{ + Id: "oldTaskId", + Version: "2017-01-01T00:00:00.000Z", + }, + }, + }, + wantReady: false, + }, + { + desc: "task not involved in deployment", + task: marathonTask{ + Id: "oldTaskId", + State: taskStateRunning, + Version: "2017-01-01T00:00:00.000Z", + }, + app: marathonApp{ + Deployments: []deployment{ + deployment{ID: "deploymentId"}, + }, + ReadinessChecks: []marathonReadinessCheck{ + marathonReadinessCheck{ + Path: "/ready", + }, + }, + ReadinessCheckResults: []readinessCheckResult{}, + Tasks: marathonTaskList{ + marathonTask{ + Id: "newTaskId", + Version: "2017-01-15T00:00:00.000Z", + }, + marathonTask{ + Id: "oldTaskId", + Version: "2017-01-01T00:00:00.000Z", + }, + }, + }, + wantReady: true, + }, + } + + for _, test := range tests { + test := test + t.Run(test.desc, func(t *testing.T) { + t.Parallel() + gotReady := calculateReadiness(test.task, test.app) + if gotReady != test.wantReady { + t.Errorf("got ready = %t, want ready = %t", gotReady, test.wantReady) + } + }) + } +} + +func assertFetchedApp(t *testing.T, index int, id string, app App) { + if app.Id != id { + t.Errorf("app #%d: got app ID '%s', want '%s'", index, app.Id, id) + } + switch { + case len(app.Tasks) != 2: + t.Errorf("app #%d: got %d tasks, want 2", index, len(app.Tasks)) + case app.Tasks[0].Id != "task1": + t.Errorf("app #%d: got ID '%s' for task #1, want 'task1", index, app.Tasks[0].Id) + case app.Tasks[1].Id != "task2": + t.Errorf("app #%d: got ID '%s' for task #2, want 'task2", index, app.Tasks[1].Id) + } +}