Skip to content

Commit

Permalink
Use built-in exporter (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
tamalsaha authored Jun 7, 2017
1 parent 8f77a76 commit 473e743
Show file tree
Hide file tree
Showing 41 changed files with 10,747 additions and 156 deletions.
155 changes: 155 additions & 0 deletions exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package main

import (
"fmt"
"net/http"
"time"

"github.com/appscode/go/runtime"
"github.com/appscode/pat"
tapi "github.com/k8sdb/apimachinery/api"
ese "github.com/k8sdb/elasticsearch_exporter/exporter"
pge "github.com/k8sdb/postgres_exporter/exporter"
"github.com/orcaman/concurrent-map"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/log"
"gopkg.in/ini.v1"
kerr "k8s.io/kubernetes/pkg/api/errors"
)

const (
ParamNamespace = ":namespace"
ParamType = ":type"
ParamName = ":name"
ParamPodIP = ":ip"
)

var (
registerers = cmap.New() // URL.path => *prometheus.Registry
)

func DeleteRegistry(w http.ResponseWriter, r *http.Request) {
defer runtime.HandleCrash()

registerers.Remove(r.URL.Path)
w.WriteHeader(http.StatusOK)
}

func ExportMetrics(w http.ResponseWriter, r *http.Request) {
defer runtime.HandleCrash()

params, found := pat.FromContext(r.Context())
if !found {
http.Error(w, "Missing parameters", http.StatusBadRequest)
return
}
namespace := params.Get(ParamNamespace)
if namespace == "" {
http.Error(w, "Missing parameter "+ParamNamespace, http.StatusBadRequest)
return
}
dbType := params.Get(ParamType)
if dbType == "" {
http.Error(w, "Missing parameter "+ParamType, http.StatusBadRequest)
return
}
dbName := params.Get(ParamName)
if dbName == "" {
http.Error(w, "Missing parameter "+ParamName, http.StatusBadRequest)
return
}
podIP := params.Get(ParamPodIP)
if podIP == "" {
http.Error(w, "Missing parameter "+ParamPodIP, http.StatusBadRequest)
return
}

switch dbType {
case tapi.ResourceTypePostgres:
var reg *prometheus.Registry
if val, ok := registerers.Get(r.URL.Path); ok {
reg = val.(*prometheus.Registry)
} else {
reg = prometheus.NewRegistry()
if absent := registerers.SetIfAbsent(r.URL.Path, reg); !absent {
r2, _ := registerers.Get(r.URL.Path)
reg = r2.(*prometheus.Registry)
} else {
log.Infof("Configuring exporter for PostgreSQL %s in namespace %s", dbName, namespace)
db, err := dbClient.Postgreses(namespace).Get(dbName)
if kerr.IsNotFound(err) {
http.NotFound(w, r)
return
} else if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
conn, err := getPostgresURL(db, podIP)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
reg.MustRegister(pge.NewExporter(conn, ""))
}
}
promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(w, r)
return
case tapi.ResourceTypeElastic:
var reg *prometheus.Registry
if val, ok := registerers.Get(r.URL.Path); ok {
reg = val.(*prometheus.Registry)
} else {
reg = prometheus.NewRegistry()
if absent := registerers.SetIfAbsent(r.URL.Path, reg); !absent {
r2, _ := registerers.Get(r.URL.Path)
reg = r2.(*prometheus.Registry)
} else {
log.Infof("Configuring exporter for Elasticsearch %s in namespace %s", dbName, namespace)
_, err := dbClient.Elastics(namespace).Get(dbName)
if kerr.IsNotFound(err) {
http.NotFound(w, r)
return
} else if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
esURI := fmt.Sprintf("http://%s:9200", podIP)
nodesStatsURI := esURI + "/_nodes/_local/stats"
clusterHealthURI := esURI + "/_cluster/health"
esTimeout := 5 * time.Second
esAllNodes := false
exporter := ese.NewExporter(nodesStatsURI, clusterHealthURI, esTimeout, esAllNodes, nil)
reg.MustRegister(exporter)
}
}
promhttp.HandlerFor(reg, promhttp.HandlerOpts{}).ServeHTTP(w, r)
return
}
http.NotFound(w, r)
}

func getPostgresURL(db *tapi.Postgres, podIP string) (string, error) {
secret, err := kubeClient.Core().Secrets(db.Namespace).Get(db.Spec.DatabaseSecret.SecretName)
if err != nil {
return "", err
}
cfg, err := ini.Load(secret.Data[".admin"])
if err != nil {
return "", err
}
section, err := cfg.GetSection("")
if err != nil {
return "", err
}
user := "postgres"
if k, err := section.GetKey("POSTGRES_USER"); err == nil {
user = k.Value()
}
var password string
if k, err := section.GetKey("POSTGRES_PASSWORD"); err == nil {
password = k.Value()
}
conn := fmt.Sprintf("postgres://%s:%s@%s:5432", user, password, podIP)
return conn, nil
}
28 changes: 21 additions & 7 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
package: github.com/k8sdb/operator
import:
- package: github.com/appscode/go
- package: github.com/appscode/pat
- package: github.com/k8sdb/apimachinery
version: master
- package: github.com/k8sdb/elasticsearch
version: master
- package: github.com/k8sdb/postgres
version: master
- package: github.com/k8sdb/elasticsearch_exporter
version: master
- package: github.com/k8sdb/postgres_exporter
version: master
- package: github.com/orcaman/concurrent-map
version: master
- package: github.com/spf13/cobra
version: master
- package: github.com/heroku/docker-registry-client
Expand All @@ -30,3 +39,8 @@ import:
- package: github.com/prometheus/client_golang
version: v0.8.0
- package: github.com/prometheus/common
- package: github.com/sirupsen/logrus
version: v0.11.5
- package: github.com/ghodss/yaml
- package: gopkg.in/yaml.v2
version: v2
28 changes: 14 additions & 14 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ var (
esOperatorTag string = "0.1.0"
elasticDumpTag string = "2.4.2"
address string = ":8080"
exporterNamespace string = namespace()
exporterTag string = "0.1.0"
operatorNamespace string = namespace()
enableAnalytics bool = true

kubeClient clientset.Interface
dbClient tcs.ExtensionInterface
)

func NewCmdRun() *cobra.Command {
Expand All @@ -59,10 +61,6 @@ func NewCmdRun() *cobra.Command {
// elasticdump flags
cmd.Flags().StringVar(&elasticDumpTag, "elasticdump.tag", elasticDumpTag, "Tag of elasticdump")

// exporter tags
cmd.Flags().StringVar(&exporterNamespace, "exporter.namespace", exporterNamespace, "Namespace for monitoring exporter")
cmd.Flags().StringVar(&exporterTag, "exporter.tag", exporterTag, "Tag of monitoring exporter")

// Analytics flags
cmd.Flags().BoolVar(&enableAnalytics, "analytics", enableAnalytics, "Send analytical event to Google Analytics")

Expand All @@ -85,8 +83,8 @@ func run() {
log.Fatalf("Could not get Kubernetes config: %s", err)
}

client := clientset.NewForConfigOrDie(config)
extClient := tcs.NewForConfigOrDie(config)
kubeClient := clientset.NewForConfigOrDie(config)
dbClient := tcs.NewForConfigOrDie(config)

cgConfig, err := cgcmd.BuildConfigFromFlags(masterURL, kubeconfigPath)
if err != nil {
Expand All @@ -107,27 +105,29 @@ func run() {

defer runtime.HandleCrash()

pgCtrl.New(client, extClient, promClient, pgCtrl.Options{
pgCtrl.New(kubeClient, dbClient, promClient, pgCtrl.Options{
GoverningService: governingService,
ExporterNamespace: exporterNamespace,
ExporterTag: exporterTag,
OperatorNamespace: operatorNamespace,
EnableAnalytics: enableAnalytics,
}).Run()

// Need to wait for sometime to run another controller.
// Or multiple controller will try to create common TPR simultaneously which gives error
time.Sleep(time.Second * 10)
esCtrl.New(client, extClient, promClient, esCtrl.Options{
esCtrl.New(kubeClient, dbClient, promClient, esCtrl.Options{
GoverningService: governingService,
ElasticDumpTag: elasticDumpTag,
OperatorTag: esOperatorTag,
ExporterNamespace: exporterNamespace,
ExporterTag: exporterTag,
OperatorNamespace: operatorNamespace,
EnableAnalytics: enableAnalytics,
}).Run()

m := pat.New()
m.Get("/metrics", promhttp.Handler())
pattern := fmt.Sprintf("/kubedb.com/v1beta1/namespaces/%s/%s/%s/pods/%s/metrics", ParamNamespace, ParamType, ParamName, ParamPodIP)
log.Infoln("URL pattern:", pattern)
m.Get(pattern, http.HandlerFunc(ExportMetrics))
m.Del(pattern, http.HandlerFunc(DeleteRegistry))
http.Handle("/", m)

log.Infof("Starting Server: %s", address)
Expand Down
Loading

0 comments on commit 473e743

Please sign in to comment.