-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmanager.go
92 lines (73 loc) · 2.02 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package zoidbergtcp
import (
"encoding/json"
"log"
"net/http"
"sync"
"github.com/bobrik/zoidberg/application"
"github.com/bobrik/zoidberg/balancer"
"github.com/bobrik/zoidberg/state"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Manager manages proxies
type Manager struct {
mutex sync.Mutex
proxies map[string]*proxy
}
// NewManager creates new proxy manager
func NewManager() *Manager {
return &Manager{
mutex: sync.Mutex{},
proxies: map[string]*proxy{},
}
}
// ServeMux returns a ServeMux object that is used to manage proxies
func (m *Manager) ServeMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
state := balancer.State{}
err := json.NewDecoder(r.Body).Decode(&state)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
m.UpdateState(state)
})
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/_health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
})
return mux
}
// UpdateState updates manager's view of the world
func (m *Manager) UpdateState(s balancer.State) {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, app := range s.Apps {
m.updateAppProxies(app, s.State.Versions[app.Name])
}
}
// updateAppProxies updates upstreams for running proxies
// and starts new proxies if needed
func (m *Manager) updateAppProxies(app application.App, versions state.Versions) {
listen := app.Meta["listen"]
if listen == "" {
log.Printf("app %s does not have listen set in meta", app.Name)
return
}
if proxy, ok := m.proxies[listen]; ok {
if proxy.app != app.Name {
log.Printf("app %s to overwrites listen of app %s: %s", app.Name, proxy.app, listen)
}
proxy.setState(app.Servers, versions)
return
}
proxy, err := newProxy(app.Name, listen)
if err != nil {
log.Printf("error creating proxy for app %s: %s", listen, err)
return
}
proxy.setState(app.Servers, versions)
go proxy.start()
m.proxies[listen] = proxy
}