Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose service port from marathon REST API #38

Merged
merged 1 commit into from
Oct 10, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,7 @@ This section tries to explain usage in code comment style:
// Marathon instance configuration
"Marathon": {
// Marathon service HTTP endpoint
"Endpoint": "http://localhost:8080",
// Same configuration as Marathon Zookeeper
"Zookeeper": {
"Host": "zk01.example.com:2812,zk02.example.com:2812",
// Marathon Zookeeper state
// Marathon default set to /marathon/state
"Path": "/marathon/state",
// Number of seconds to delay the reload event
"ReportingDelay": 5
"Endpoint": "http://localhost:8080"
}
},

Expand Down
18 changes: 18 additions & 0 deletions config/haproxy_template.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,21 @@ backend {{ $app.EscapedId }}-cluster
{{ range $page, $task := .Tasks }}
server {{ $app.EscapedId}}-{{ $task.Host }}-{{ $task.Port }} {{ $task.Host }}:{{ $task.Port }} {{ if $app.HealthCheckPath }} check inter 30000 {{ end }} {{ end }}
{{ end }}

##
## map service ports of marathon apps
## ( see https://mesosphere.github.io/marathon/docs/service-discovery-load-balancing.html#ports-assignment ))
## to haproxy frontend port
##
## {{ range $index, $app := .Apps }}
## listen {{ $app.EscapedId }}_{{ $app.ServicePort }}
## bind *:{{ $app.ServicePort }}
## mode http
## {{ if $app.HealthCheckPath }}
## # option httpchk GET {{ $app.HealthCheckPath }}
## {{ end }}
## balance leastconn
## option forwardfor
## {{ range $page, $task := .Tasks }}
## server {{ $app.EscapedId}}-{{ $task.Host }}-{{ $task.Port }} {{ $task.Host }}:{{ $task.Port }} {{ if $app.HealthCheckPath }} check inter 30000 {{ end }} {{ end }}
## {{ end }}
4 changes: 2 additions & 2 deletions main/bamboo/bamboo.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ func main() {
for {
sig := <-signalChannel
if sig == syscall.SIGCHLD {
r := syscall.Rusage {}
syscall.Wait4( -1, nil, 0, &r)
r := syscall.Rusage{}
syscall.Wait4(-1, nil, 0, &r)
}
}
}()
Expand Down
101 changes: 58 additions & 43 deletions services/marathon/marathon.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package marathon

import(
"net/http"
"io/ioutil"
import (
"encoding/json"
"strings"
"io/ioutil"
"net/http"
"sort"
"strings"
)

// Describes an app process running
Expand All @@ -16,10 +16,11 @@ type Task struct {

// An app may have multiple processes
type App struct {
Id string
EscapedId string
Id string
EscapedId string
HealthCheckPath string
Tasks []Task
Tasks []Task
ServicePort int
}

type AppList []App
Expand All @@ -36,21 +37,21 @@ func (slice AppList) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}


type MarathonTaskList []MarathonTask

type MarathonTasks struct {
Tasks MarathonTaskList `json:tasks`
}

type MarathonTask struct {
AppId string
Id string
Host string
Ports []int
StartedAt string
StagedAt string
Version string
AppId string
Id string
Host string
Ports []int
ServicePorts []int
StartedAt string
StagedAt string
Version string
}

func (slice MarathonTaskList) Len() int {
Expand All @@ -70,8 +71,9 @@ type MarathonApps struct {
}

type MarathonApp struct {
Id string `json:id`
Id string `json:id`
HealthChecks []HealthChecks `json:healthChecks`
Ports []int `json:ports`
}

type HealthChecks struct {
Expand All @@ -88,7 +90,7 @@ func fetchMarathonApps(endpoint string) (map[string]MarathonApp, error) {
var appResponse MarathonApps

contents, err := ioutil.ReadAll(response.Body)
if (err != nil) {
if err != nil {
return nil, err
}

Expand All @@ -109,7 +111,7 @@ func fetchMarathonApps(endpoint string) (map[string]MarathonApp, error) {

func fetchTasks(endpoint string) (map[string][]MarathonTask, error) {
client := &http.Client{}
req, err := http.NewRequest("GET", endpoint + "/v2/tasks", nil)
req, err := http.NewRequest("GET", endpoint+"/v2/tasks", nil)
req.Header.Add("Accept", "application/json")
response, err := client.Do(req)

Expand All @@ -120,10 +122,14 @@ func fetchTasks(endpoint string) (map[string][]MarathonTask, error) {
} else {
contents, err := ioutil.ReadAll(response.Body)
defer response.Body.Close()
if err != nil { return nil, err }
if err != nil {
return nil, err
}

err = json.Unmarshal(contents, &tasks)
if err != nil { return nil, err }
if err != nil {
return nil, err
}

taskList := tasks.Tasks
sort.Sort(taskList)
Expand All @@ -144,36 +150,41 @@ func createApps(tasksById map[string][]MarathonTask, marathonApps map[string]Mar

apps := AppList{}

for appId, tasks := range tasksById {
simpleTasks := []Task{}
for appId, tasks := range tasksById {
simpleTasks := []Task{}

for _, task := range tasks {
if len(task.Ports) > 0 {
simpleTasks = append(simpleTasks, Task{ Host: task.Host, Port: task.Ports[0] })
}
for _, task := range tasks {
if len(task.Ports) > 0 {
simpleTasks = append(simpleTasks, Task{Host: task.Host, Port: task.Ports[0]})
}
}

// Try to handle old app id format without slashes
appPath := appId
if (!strings.HasPrefix(appId, "/")) {
appPath = "/" + appId
}
// Try to handle old app id format without slashes
appPath := appId
if !strings.HasPrefix(appId, "/") {
appPath = "/" + appId
}

app := App {
// Since Marathon 0.7, apps are namespaced with path
Id: appPath,
// Used for template
EscapedId: strings.Replace(appId, "/", "::", -1),
Tasks: simpleTasks,
HealthCheckPath: parseHealthCheckPath(marathonApps[appId].HealthChecks),
}
apps = append(apps, app)
app := App{
// Since Marathon 0.7, apps are namespaced with path
Id: appPath,
// Used for template
EscapedId: strings.Replace(appId, "/", "::", -1),
Tasks: simpleTasks,
HealthCheckPath: parseHealthCheckPath(marathonApps[appId].HealthChecks),
}

if len(marathonApps[appId].Ports) > 0 {
app.ServicePort = marathonApps[appId].Ports[0]
}

apps = append(apps, app)
}
return apps
}

func parseHealthCheckPath(checks []HealthChecks) string {
if (len(checks) > 0) {
if len(checks) > 0 {
return checks[0].Path
}
return ""
Expand All @@ -188,10 +199,14 @@ func parseHealthCheckPath(checks []HealthChecks) string {
*/
func FetchApps(endpoint string) (AppList, error) {
tasks, err := fetchTasks(endpoint)
if err != nil { return nil, err }
if err != nil {
return nil, err
}

marathonApps, err := fetchMarathonApps(endpoint)
if err != nil { return nil, err }
if err != nil {
return nil, err
}

apps := createApps(tasks, marathonApps)
sort.Sort(apps)
Expand Down