Skip to content

Commit

Permalink
feat(remote): Implement lazy distributed feature (#64)
Browse files Browse the repository at this point in the history
THIS IS AN EXPERIMENTAL FEATURE/IMPLEMENTATION, AND IT MAY BE REMOVED IN THE FUTURE.

Note that for now, it will be an undocumented feature.
  • Loading branch information
TwiN committed Jul 29, 2022
1 parent 2b44833 commit 15e6107
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 61 deletions.
17 changes: 17 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/TwiN/gatus/v4/alerting/alert"
"github.com/TwiN/gatus/v4/alerting/provider"
"github.com/TwiN/gatus/v4/config/maintenance"
"github.com/TwiN/gatus/v4/config/remote"
"github.com/TwiN/gatus/v4/config/ui"
"github.com/TwiN/gatus/v4/config/web"
"github.com/TwiN/gatus/v4/core"
Expand Down Expand Up @@ -85,6 +86,10 @@ type Config struct {
// Maintenance is the configuration for creating a maintenance window in which no alerts are sent
Maintenance *maintenance.Config `yaml:"maintenance,omitempty"`

// Remote is the configuration for remote Gatus instances
// WARNING: This is in ALPHA and may change or be completely removed in the future
Remote *remote.Config `yaml:"remote,omitempty"`

filePath string // path to the file from which config was loaded from
lastFileModTime time.Time // last modification time
}
Expand Down Expand Up @@ -185,10 +190,22 @@ func parseAndValidateConfigBytes(yamlBytes []byte) (config *Config, err error) {
if err := validateStorageConfig(config); err != nil {
return nil, err
}
if err := validateRemoteConfig(config); err != nil {
return nil, err
}
}
return
}

func validateRemoteConfig(config *Config) error {
if config.Remote != nil {
if err := config.Remote.ValidateAndSetDefaults(); err != nil {
return err
}
}
return nil
}

func validateStorageConfig(config *Config) error {
if config.Storage == nil {
config.Storage = &storage.Config{
Expand Down
38 changes: 38 additions & 0 deletions config/remote/remote.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package remote

import (
"log"

"github.com/TwiN/gatus/v4/client"
)

// NOTICE: This is an experimental alpha feature and may be updated/removed in future versions.
// For more information, see https://github.com/TwiN/gatus/issues/64

type Config struct {
// Instances is a list of remote instances to retrieve endpoint statuses from.
Instances []Instance `yaml:"instances,omitempty"`

// ClientConfig is the configuration of the client used to communicate with the provider's target
ClientConfig *client.Config `yaml:"client,omitempty"`
}

type Instance struct {
EndpointPrefix string `yaml:"endpoint-prefix"`
URL string `yaml:"url"`
}

func (c *Config) ValidateAndSetDefaults() error {
if c.ClientConfig == nil {
c.ClientConfig = client.GetDefaultConfig()
} else {
if err := c.ClientConfig.ValidateAndSetDefaults(); err != nil {
return err
}
}
if len(c.Instances) > 0 {
log.Println("WARNING: Your configuration is using 'remote', which is in alpha and may be updated/removed in future versions.")
log.Println("WARNING: See https://github.com/TwiN/gatus/issues/64 for more information")
}
return nil
}
11 changes: 5 additions & 6 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ import (
"os"
"time"

"github.com/TwiN/gatus/v4/config"
"github.com/TwiN/gatus/v4/config/ui"
"github.com/TwiN/gatus/v4/config/web"
"github.com/TwiN/gatus/v4/controller/handler"
"github.com/TwiN/gatus/v4/security"
)

var (
Expand All @@ -21,19 +20,19 @@ var (
)

// Handle creates the router and starts the server
func Handle(securityConfig *security.Config, webConfig *web.Config, uiConfig *ui.Config, enableMetrics bool) {
var router http.Handler = handler.CreateRouter(ui.StaticFolder, securityConfig, uiConfig, enableMetrics)
func Handle(cfg *config.Config) {
var router http.Handler = handler.CreateRouter(ui.StaticFolder, cfg)
if os.Getenv("ENVIRONMENT") == "dev" {
router = handler.DevelopmentCORS(router)
}
server = &http.Server{
Addr: fmt.Sprintf("%s:%d", webConfig.Address, webConfig.Port),
Addr: fmt.Sprintf("%s:%d", cfg.Web.Address, cfg.Web.Port),
Handler: router,
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
IdleTimeout: 15 * time.Second,
}
log.Println("[controller][Handle] Listening on " + webConfig.SocketAddress())
log.Println("[controller][Handle] Listening on " + cfg.Web.SocketAddress())
if os.Getenv("ROUTER_TEST") == "true" {
return
}
Expand Down
2 changes: 1 addition & 1 deletion controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestHandle(t *testing.T) {
_ = os.Setenv("ROUTER_TEST", "true")
_ = os.Setenv("ENVIRONMENT", "dev")
defer os.Clearenv()
Handle(cfg.Security, cfg.Web, cfg.UI, cfg.Metrics)
Handle(cfg)
defer Shutdown()
request, _ := http.NewRequest("GET", "/health", http.NoBody)
responseRecorder := httptest.NewRecorder()
Expand Down
2 changes: 1 addition & 1 deletion controller/handler/badge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestBadge(t *testing.T) {
}
watchdog.UpdateEndpointStatuses(cfg.Endpoints[0], &core.Result{Success: true, Connected: true, Duration: time.Millisecond, Timestamp: time.Now()})
watchdog.UpdateEndpointStatuses(cfg.Endpoints[1], &core.Result{Success: false, Connected: false, Duration: time.Second, Timestamp: time.Now()})
router := CreateRouter("../../web/static", cfg.Security, nil, cfg.Metrics)
router := CreateRouter("../../web/static", cfg)
type Scenario struct {
Name string
Path string
Expand Down
2 changes: 1 addition & 1 deletion controller/handler/chart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestResponseTimeChart(t *testing.T) {
}
watchdog.UpdateEndpointStatuses(cfg.Endpoints[0], &core.Result{Success: true, Duration: time.Millisecond, Timestamp: time.Now()})
watchdog.UpdateEndpointStatuses(cfg.Endpoints[1], &core.Result{Success: false, Duration: time.Second, Timestamp: time.Now()})
router := CreateRouter("../../web/static", cfg.Security, nil, cfg.Metrics)
router := CreateRouter("../../web/static", cfg)
type Scenario struct {
Name string
Path string
Expand Down
116 changes: 81 additions & 35 deletions controller/handler/endpoint_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@ import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strings"
"time"

"github.com/TwiN/gatus/v4/client"
"github.com/TwiN/gatus/v4/config"
"github.com/TwiN/gatus/v4/config/remote"
"github.com/TwiN/gatus/v4/core"
"github.com/TwiN/gatus/v4/storage/store"
"github.com/TwiN/gatus/v4/storage/store/common"
"github.com/TwiN/gatus/v4/storage/store/common/paging"
Expand All @@ -28,48 +33,89 @@ var (
// EndpointStatuses handles requests to retrieve all EndpointStatus
// Due to the size of the response, this function leverages a cache.
// Must not be wrapped by GzipHandler
func EndpointStatuses(writer http.ResponseWriter, r *http.Request) {
page, pageSize := extractPageAndPageSizeFromRequest(r)
gzipped := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip")
var exists bool
var value interface{}
if gzipped {
writer.Header().Set("Content-Encoding", "gzip")
value, exists = cache.Get(fmt.Sprintf("endpoint-status-%d-%d-gzipped", page, pageSize))
} else {
value, exists = cache.Get(fmt.Sprintf("endpoint-status-%d-%d", page, pageSize))
func EndpointStatuses(cfg *config.Config) http.HandlerFunc {
return func(writer http.ResponseWriter, r *http.Request) {
page, pageSize := extractPageAndPageSizeFromRequest(r)
gzipped := strings.Contains(r.Header.Get("Accept-Encoding"), "gzip")
var exists bool
var value interface{}
if gzipped {
writer.Header().Set("Content-Encoding", "gzip")
value, exists = cache.Get(fmt.Sprintf("endpoint-status-%d-%d-gzipped", page, pageSize))
} else {
value, exists = cache.Get(fmt.Sprintf("endpoint-status-%d-%d", page, pageSize))
}
var data []byte
if !exists {
var err error
buffer := &bytes.Buffer{}
gzipWriter := gzip.NewWriter(buffer)
endpointStatuses, err := store.Get().GetAllEndpointStatuses(paging.NewEndpointStatusParams().WithResults(page, pageSize))
if err != nil {
log.Printf("[handler][EndpointStatuses] Failed to retrieve endpoint statuses: %s", err.Error())
http.Error(writer, err.Error(), http.StatusInternalServerError)
return
}
// ALPHA: Retrieve endpoint statuses from remote instances
if endpointStatusesFromRemote, err := getEndpointStatusesFromRemoteInstances(cfg.Remote); err != nil {
log.Printf("[handler][EndpointStatuses] Silently failed to retrieve endpoint statuses from remote: %s", err.Error())
} else if endpointStatusesFromRemote != nil {
endpointStatuses = append(endpointStatuses, endpointStatusesFromRemote...)
}
// Marshal endpoint statuses to JSON
data, err = json.Marshal(endpointStatuses)
if err != nil {
log.Printf("[handler][EndpointStatuses] Unable to marshal object to JSON: %s", err.Error())
http.Error(writer, "unable to marshal object to JSON", http.StatusInternalServerError)
return
}
_, _ = gzipWriter.Write(data)
_ = gzipWriter.Close()
gzippedData := buffer.Bytes()
cache.SetWithTTL(fmt.Sprintf("endpoint-status-%d-%d", page, pageSize), data, cacheTTL)
cache.SetWithTTL(fmt.Sprintf("endpoint-status-%d-%d-gzipped", page, pageSize), gzippedData, cacheTTL)
if gzipped {
data = gzippedData
}
} else {
data = value.([]byte)
}
writer.Header().Add("Content-Type", "application/json")
writer.WriteHeader(http.StatusOK)
_, _ = writer.Write(data)
}
}

func getEndpointStatusesFromRemoteInstances(remoteConfig *remote.Config) ([]*core.EndpointStatus, error) {
if remoteConfig == nil || len(remoteConfig.Instances) == 0 {
return nil, nil
}
var data []byte
if !exists {
var err error
buffer := &bytes.Buffer{}
gzipWriter := gzip.NewWriter(buffer)
endpointStatuses, err := store.Get().GetAllEndpointStatuses(paging.NewEndpointStatusParams().WithResults(page, pageSize))
var endpointStatusesFromAllRemotes []*core.EndpointStatus
httpClient := client.GetHTTPClient(remoteConfig.ClientConfig)
for _, instance := range remoteConfig.Instances {
response, err := httpClient.Get(instance.URL)
if err != nil {
log.Printf("[handler][EndpointStatuses] Failed to retrieve endpoint statuses: %s", err.Error())
http.Error(writer, err.Error(), http.StatusInternalServerError)
return
return nil, err
}
data, err = json.Marshal(endpointStatuses)
body, err := io.ReadAll(response.Body)
if err != nil {
log.Printf("[handler][EndpointStatuses] Unable to marshal object to JSON: %s", err.Error())
http.Error(writer, "unable to marshal object to JSON", http.StatusInternalServerError)
return
_ = response.Body.Close()
log.Printf("[handler][getEndpointStatusesFromRemoteInstances] Silently failed to retrieve endpoint statuses from %s: %s", instance.URL, err.Error())
continue
}
_, _ = gzipWriter.Write(data)
_ = gzipWriter.Close()
gzippedData := buffer.Bytes()
cache.SetWithTTL(fmt.Sprintf("endpoint-status-%d-%d", page, pageSize), data, cacheTTL)
cache.SetWithTTL(fmt.Sprintf("endpoint-status-%d-%d-gzipped", page, pageSize), gzippedData, cacheTTL)
if gzipped {
data = gzippedData
var endpointStatuses []*core.EndpointStatus
if err = json.Unmarshal(body, &endpointStatuses); err != nil {
_ = response.Body.Close()
log.Printf("[handler][getEndpointStatusesFromRemoteInstances] Silently failed to retrieve endpoint statuses from %s: %s", instance.URL, err.Error())
continue
}
} else {
data = value.([]byte)
_ = response.Body.Close()
for _, endpointStatus := range endpointStatuses {
endpointStatus.Name = instance.EndpointPrefix + endpointStatus.Name
}
endpointStatusesFromAllRemotes = append(endpointStatusesFromAllRemotes, endpointStatuses...)
}
writer.Header().Add("Content-Type", "application/json")
writer.WriteHeader(http.StatusOK)
_, _ = writer.Write(data)
return endpointStatusesFromAllRemotes, nil
}

// EndpointStatus retrieves a single core.EndpointStatus by group and endpoint name
Expand Down
4 changes: 2 additions & 2 deletions controller/handler/endpoint_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func TestEndpointStatus(t *testing.T) {
}
watchdog.UpdateEndpointStatuses(cfg.Endpoints[0], &core.Result{Success: true, Duration: time.Millisecond, Timestamp: time.Now()})
watchdog.UpdateEndpointStatuses(cfg.Endpoints[1], &core.Result{Success: false, Duration: time.Second, Timestamp: time.Now()})
router := CreateRouter("../../web/static", cfg.Security, nil, cfg.Metrics)
router := CreateRouter("../../web/static", cfg)

type Scenario struct {
Name string
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestEndpointStatuses(t *testing.T) {
// Can't be bothered dealing with timezone issues on the worker that runs the automated tests
firstResult.Timestamp = time.Time{}
secondResult.Timestamp = time.Time{}
router := CreateRouter("../../web/static", nil, nil, false)
router := CreateRouter("../../web/static", &config.Config{Metrics: true})

type Scenario struct {
Name string
Expand Down
4 changes: 3 additions & 1 deletion controller/handler/favicon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"net/http"
"net/http/httptest"
"testing"

"github.com/TwiN/gatus/v4/config"
)

func TestFavIcon(t *testing.T) {
router := CreateRouter("../../web/static", nil, nil, false)
router := CreateRouter("../../web/static", &config.Config{})
type Scenario struct {
Name string
Path string
Expand Down
21 changes: 10 additions & 11 deletions controller/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,31 @@ package handler
import (
"net/http"

"github.com/TwiN/gatus/v4/config/ui"
"github.com/TwiN/gatus/v4/security"
"github.com/TwiN/gatus/v4/config"
"github.com/TwiN/health"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func CreateRouter(staticFolder string, securityConfig *security.Config, uiConfig *ui.Config, enabledMetrics bool) *mux.Router {
func CreateRouter(staticFolder string, cfg *config.Config) *mux.Router {
router := mux.NewRouter()
if enabledMetrics {
if cfg.Metrics {
router.Handle("/metrics", promhttp.Handler()).Methods("GET")
}
api := router.PathPrefix("/api").Subrouter()
protected := api.PathPrefix("/").Subrouter()
unprotected := api.PathPrefix("/").Subrouter()
if securityConfig != nil {
if err := securityConfig.RegisterHandlers(router); err != nil {
if cfg.Security != nil {
if err := cfg.Security.RegisterHandlers(router); err != nil {
panic(err)
}
if err := securityConfig.ApplySecurityMiddleware(protected); err != nil {
if err := cfg.Security.ApplySecurityMiddleware(protected); err != nil {
panic(err)
}
}
// Endpoints
unprotected.Handle("/v1/config", ConfigHandler{securityConfig: securityConfig}).Methods("GET")
protected.HandleFunc("/v1/endpoints/statuses", EndpointStatuses).Methods("GET") // No GzipHandler for this one, because we cache the content as Gzipped already
unprotected.Handle("/v1/config", ConfigHandler{securityConfig: cfg.Security}).Methods("GET")
protected.HandleFunc("/v1/endpoints/statuses", EndpointStatuses(cfg)).Methods("GET") // No GzipHandler for this one, because we cache the content as Gzipped already
protected.HandleFunc("/v1/endpoints/{key}/statuses", GzipHandlerFunc(EndpointStatus)).Methods("GET")
unprotected.HandleFunc("/v1/endpoints/{key}/health/badge.svg", HealthBadge).Methods("GET")
unprotected.HandleFunc("/v1/endpoints/{key}/uptimes/{duration}/badge.svg", UptimeBadge).Methods("GET")
Expand All @@ -38,8 +37,8 @@ func CreateRouter(staticFolder string, securityConfig *security.Config, uiConfig
router.Handle("/health", health.Handler().WithJSON(true)).Methods("GET")
router.HandleFunc("/favicon.ico", FavIcon(staticFolder)).Methods("GET")
// SPA
router.HandleFunc("/endpoints/{name}", SinglePageApplication(staticFolder, uiConfig)).Methods("GET")
router.HandleFunc("/", SinglePageApplication(staticFolder, uiConfig)).Methods("GET")
router.HandleFunc("/endpoints/{name}", SinglePageApplication(staticFolder, cfg.UI)).Methods("GET")
router.HandleFunc("/", SinglePageApplication(staticFolder, cfg.UI)).Methods("GET")
// Everything else falls back on static content
router.PathPrefix("/").Handler(GzipHandler(http.FileServer(http.Dir(staticFolder))))
return router
Expand Down
4 changes: 3 additions & 1 deletion controller/handler/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ import (
"net/http"
"net/http/httptest"
"testing"

"github.com/TwiN/gatus/v4/config"
)

func TestCreateRouter(t *testing.T) {
router := CreateRouter("../../web/static", nil, nil, true)
router := CreateRouter("../../web/static", &config.Config{Metrics: true})
type Scenario struct {
Name string
Path string
Expand Down
2 changes: 1 addition & 1 deletion controller/handler/spa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestSinglePageApplication(t *testing.T) {
}
watchdog.UpdateEndpointStatuses(cfg.Endpoints[0], &core.Result{Success: true, Duration: time.Millisecond, Timestamp: time.Now()})
watchdog.UpdateEndpointStatuses(cfg.Endpoints[1], &core.Result{Success: false, Duration: time.Second, Timestamp: time.Now()})
router := CreateRouter("../../web/static", cfg.Security, nil, cfg.Metrics)
router := CreateRouter("../../web/static", cfg)
type Scenario struct {
Name string
Path string
Expand Down
Loading

0 comments on commit 15e6107

Please sign in to comment.