diff --git a/Gopkg.lock b/Gopkg.lock index d13cc54..17611e9 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -103,6 +103,12 @@ packages = ["."] revision = "f3f9494671f93fcff853e3c6e9e948b3eb71e590" +[[projects]] + name = "github.com/go-stack/stack" + packages = ["."] + revision = "817915b46b97fd7bb80e8ab6b69f01a53ac3eebf" + version = "v1.6.0" + [[projects]] name = "github.com/gogo/protobuf" packages = ["proto","sortkeys"] @@ -146,10 +152,10 @@ version = "0.2.2" [[projects]] + branch = "master" name = "github.com/inconshreveable/log15" - packages = ["."] - revision = "b105bd37f74e5d9dc7b6ad7806715c7a2b83fd3f" - version = "v2.11" + packages = [".","term"] + revision = "74a0988b5f804e8ce9ff74fca4f16980776dff29" [[projects]] branch = "master" @@ -265,12 +271,6 @@ packages = ["internal/gen","internal/triegen","internal/ucd","secure/bidirule","transform","unicode/bidi","unicode/cldr","unicode/norm","unicode/rangetable","width"] revision = "836efe42bb4aa16aaa17b9c155d8813d336ed720" -[[projects]] - name = "gopkg.in/inconshreveable/log15.v2" - packages = ["stack","term"] - revision = "dc7890abeaadcb6a79a9a5ee30bc1897bbf97713" - version = "v2.10" - [[projects]] name = "gopkg.in/inf.v0" packages = ["."] @@ -304,6 +304,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "34d8ab7a8049e631527276ab85ee79f0741daff5d1e939065559b0eea8a85173" + inputs-digest = "ce31e62f846c3493e12c8c37099429d30e83253762628f3840df3aacb8846eec" solver-name = "gps-cdcl" solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml index 323002c..277acaf 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -1,21 +1,3 @@ - -# Gopkg.toml example -# -# Refer to https://github.com/golang/dep/blob/master/docs/Gopkg.toml.md -# for detailed Gopkg.toml documentation. -# -# required = ["github.com/user/thing/cmd/thing"] -# ignored = ["github.com/user/project/pkgX", "bitbucket.org/user/project/pkgA/pkgY"] -# -# [[constraint]] -# name = "github.com/user/project" -# version = "1.0.0" -# -# [[constraint]] -# name = "github.com/user/project2" -# branch = "dev" -# source = "github.com/myfork/project2" -# -# [[override]] -# name = "github.com/x/y" -# version = "2.4.0" +[[constraint]] + name = "github.com/inconshreveable/log15" + branch = "master" diff --git a/Makefile b/Makefile index 9efa266..1052876 100644 --- a/Makefile +++ b/Makefile @@ -22,8 +22,8 @@ container: $(BIN) docker build -t $(CONTAINER):latest . docker push $(CONTAINER):latest -manifest: $(MANIFEST) -$(MANIFEST): +manifest: + @make -C $(MANIFEST_DIR) clean @make -C $(MANIFEST_DIR) clean: @@ -31,7 +31,6 @@ clean: @make -C bpf/ clean deploy-clean: clean - @make -C $(MANIFEST_DIR) clean docker rmi $(CONTAINER):latest deps: build-deps diff --git a/bpf/Makefile b/bpf/Makefile index 3fb497a..274a999 100644 --- a/bpf/Makefile +++ b/bpf/Makefile @@ -1,7 +1,6 @@ OUTDIR=out OUTFILE=$(OUTDIR)/cgnet.o - .PHONY: all build clean all: $(OUTFILE) bindata.go @@ -18,6 +17,5 @@ bindata.go: go-bindata -pkg bpf out/ clean: - rm -rf cgnet.o rm -rf bindata.go rm -rf $(OUTFILE) diff --git a/bpf/bpf.go b/bpf/bpf.go index 69cf106..9fe57c3 100644 --- a/bpf/bpf.go +++ b/bpf/bpf.go @@ -1,3 +1,19 @@ +/* +Copyright 2017 Kinvolk GmbH + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package bpf import ( @@ -5,10 +21,9 @@ import ( "fmt" "log" "os" - "time" "unsafe" - "github.com/iovisor/gobpf/elf" + bpf "github.com/iovisor/gobpf/elf" ) /* @@ -16,88 +31,79 @@ import ( */ import "C" -const assetpath string = "out/cgnet.o" -const mapname string = "count" +const ( + assetPath string = "out/cgnet.o" + mapName string = "count_map" +) + +func Attach(cgroupPath string) (*Controller, error) { + b, err := initModule() + if err != nil { + return nil, err + } -var ( - zero uint32 = 0 - packetsKey uint32 = zero - bytesKey uint32 = 1 + for prog := range b.IterCgroupProgram() { + if err := bpf.AttachCgroupProgram(prog, cgroupPath, bpf.EgressType); err != nil { + return nil, fmt.Errorf("error attaching to cgroup %s: %s", cgroupPath, err) + } + } - b *elf.Module -) + if err := initMap(b); err != nil { + return nil, err + } -func Setup(cgroupPath string) error { - log.SetFlags(log.LstdFlags | log.Lshortfile) + ctl := &Controller{ + cgroup: cgroupPath, + module: b, + quit: make(chan struct{}), + } + return ctl, nil +} - // for quick development on the bpf program +func initModule() (*bpf.Module, error) { + var b *bpf.Module if path := os.Getenv("BPF_PROG_PATH"); path != "" { - fmt.Printf("using: %s\n", path) - b = elf.NewModule(path) + // for development + b = bpf.NewModule(path) } else { - reader := bytes.NewReader(MustAsset(assetpath)) - b = elf.NewModuleFromReader(reader) + // from assets + reader := bytes.NewReader(MustAsset(assetPath)) + b = bpf.NewModuleFromReader(reader) } if b == nil { - return fmt.Errorf("System doesn't support BPF") + return nil, fmt.Errorf("system doesn't seem to support BPF") } if err := b.Load(nil); err != nil { - return fmt.Errorf("%s", err) - } - - for prog := range b.IterCgroupProgram() { - if err := elf.AttachCgroupProgram(prog, cgroupPath, elf.EgressType); err != nil { - return fmt.Errorf("%s", err) - } - } - - mp := b.Map(mapname) - if mp == nil { - return fmt.Errorf("Can't find map '%s'", mapname) + return nil, fmt.Errorf("loading module failed: %s", err) } + return b, nil +} - if err := b.UpdateElement(mp, unsafe.Pointer(&packetsKey), unsafe.Pointer(&zero), C.BPF_ANY); err != nil { +func initMap(b *bpf.Module) error { + if err := update(b, packetsKey, 0); err != nil { return fmt.Errorf("error updating map: %s", err) } - - if err := b.UpdateElement(mp, unsafe.Pointer(&bytesKey), unsafe.Pointer(&zero), C.BPF_EXIST); err != nil { + if err := update(b, bytesKey, 0); err != nil { return fmt.Errorf("error updating map: %s", err) } - - fmt.Println("Ready.") return nil } -func UpdateLoop(quit chan struct{}) error { - mp := b.Map(mapname) - if mp == nil { - return fmt.Errorf("Can't find map '%s'", mapname) - } +func update(b *bpf.Module, key uint32, value uint64) error { - var packets, bytes uint64 - - for { - select { - case <-quit: - return nil - case <-time.After(1000 * time.Millisecond): - if err := updateElements(mp, packets, bytes); err != nil { - return err - } - fmt.Printf("cgroup received %d packets (%d bytes)\n", packets, bytes) - } + mp := b.Map(mapName) + if err := b.UpdateElement(mp, unsafe.Pointer(&key), unsafe.Pointer(&value), C.BPF_ANY); err != nil { + return err } + return nil } -func updateElements(mp *elf.Map, packets, bytes uint64) error { - if err := b.LookupElement(mp, unsafe.Pointer(&packetsKey), unsafe.Pointer(&packets)); err != nil { - return fmt.Errorf("error looking up in map: %s", err) +func lookup(b *bpf.Module, key uint32) (uint64, error) { + mp := b.Map(mapName) + var value uint64 + if err := b.LookupElement(mp, unsafe.Pointer(&key), unsafe.Pointer(&value)); err != nil { + return 0, err } - - if err := b.LookupElement(mp, unsafe.Pointer(&bytesKey), unsafe.Pointer(&bytes)); err != nil { - return fmt.Errorf("error looking up in map: %s", err) - } - - return nil + return value, nil } diff --git a/bpf/controller.go b/bpf/controller.go new file mode 100644 index 0000000..79aeb3b --- /dev/null +++ b/bpf/controller.go @@ -0,0 +1,88 @@ +/* +Copyright 2017 Kinvolk GmbH + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bpf + +import ( + "context" + "time" + + bpf "github.com/iovisor/gobpf/elf" +) + +/* +#include +*/ +import "C" + +const ( + packetsKey uint32 = 0 + bytesKey uint32 = 1 +) + +type Controller struct { + cgroup string + module *bpf.Module + quit chan struct{} + + packetsHandler func(uint64) error + bytesHandler func(uint64) error +} + +func (c *Controller) Stop() { + c.quit <- struct{}{} + if err := c.module.Close(); err != nil { + Log.Error("error closing bpf.Module", "err", err) + } +} + +func (c *Controller) SetPacketsHandler(h func(uint64) error) { + c.packetsHandler = h +} +func (c *Controller) SetBytesHandler(h func(uint64) error) { + c.bytesHandler = h +} + +func (c *Controller) Run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-c.quit: + return + case <-time.After(1 * time.Second): + // TODO this needs to be solved differently + // Maybe sending new values on individual channels? + packets, err := lookup(c.module, packetsKey) + if err != nil { + Log.Error("lookup failed", "err", err) + continue + } + if err := c.packetsHandler(packets); err != nil { + Log.Error("packetsHandler failed", "err", err) + } + + bytes, err := lookup(c.module, bytesKey) + if err != nil { + Log.Error("lookup failed", "err", err) + continue + } + if err := c.bytesHandler(bytes); err != nil { + Log.Error("bytesHandler failed", "err", err) + } + } + } +} diff --git a/bpf/src/cgnet.c b/bpf/src/cgnet.c index 635d673..c1f9fa0 100644 --- a/bpf/src/cgnet.c +++ b/bpf/src/cgnet.c @@ -20,7 +20,10 @@ #include #include "bpf_helpers.h" -struct bpf_map_def SEC("maps/count") count_map = { +#define PACKETS_KEY 0 +#define BYTES_KEY 1 + +struct bpf_map_def SEC("maps/count_map") count_map = { .type = BPF_MAP_TYPE_ARRAY, .key_size = sizeof(int), .value_size = sizeof(__u64), @@ -29,7 +32,7 @@ struct bpf_map_def SEC("maps/count") count_map = { SEC("cgroup/skb") int count_packets(struct __sk_buff *skb) { - int packets_key = 0, bytes_key = 1; + int packets_key = PACKETS_KEY, bytes_key = BYTES_KEY; __u64 *packets = 0; __u64 *bytes = 0; diff --git a/cmd/export.go b/cmd/export.go new file mode 100644 index 0000000..767b0f1 --- /dev/null +++ b/cmd/export.go @@ -0,0 +1,119 @@ +/* +Copyright 2017 Kinvolk GmbH + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cmd + +import ( + "context" + "fmt" + "os" + "os/signal" + "path" + "strings" + "syscall" + + log "github.com/inconshreveable/log15" + "github.com/spf13/cobra" + + "github.com/kinvolk/cgnet/bpf" + "github.com/kinvolk/cgnet/kube" + "github.com/kinvolk/cgnet/metrics" +) + +var ( + metricsPort int + kubeconfig string +) + +var exportCmd = &cobra.Command{ + Use: "export", + Short: "Serve metrics to Prometheus", + Run: cmdExport, +} + +func cmdExport(cmd *cobra.Command, args []string) { + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + + // needs to be run before any API interaction + cfg, err := kube.BuildConfig(kubeconfig) + if err != nil { + log.Error("error building config", "err", err) + return + } + + cgroupRoot, err := kube.GetCgroupRoot(cfg) + if err != nil { + log.Error("error retrieving cgroup root for cluster", "err", err) + return + } + log.Debug("cgroup root", "path", cgroupRoot) + + events := make(chan kube.Event) + go kube.WatchPodEvents(ctx, cancelFunc, cfg, events) + + addr := fmt.Sprintf(":%d", metricsPort) + go metrics.Serve(ctx, addr) + + term := make(chan os.Signal) + signal.Notify(term, syscall.SIGINT, syscall.SIGTERM) + + var ctlMap map[string]*bpf.Controller + for { + select { + case <-term: + return + case <-ctx.Done(): + return + case e := <-events: + switch e.Type { + case kube.NewPodEvent: + metrics.TotalNum().Add(1) + log.Debug("new pod", "pod", e.PodSelfLink, "cgroup", buildCgroupPath(cgroupRoot, e.PodUID, e.PodQOSClass)) + // FIXME: https://github.com/kinvolk/cgnet/issues/16 + cgPath := buildCgroupPath(cgroupRoot, e.PodUID, e.PodQOSClass) + bpfController, err := bpf.Attach(cgPath) + if err != nil { + continue + } + + bpfController.SetPacketsHandler(func(v uint64) error { + metrics.SetOutgoingPackets(path.Base(e.PodSelfLink), float64(v)) + return nil + }) + go bpfController.Run(ctx) + ctlMap[e.PodUID] = bpfController + + case kube.DeletePodEvent: + metrics.TotalNum().Sub(1) + log.Debug("pod gone", "pod", e.PodSelfLink, "cgroup", buildCgroupPath(cgroupRoot, e.PodUID, e.PodQOSClass)) + metrics.TotalNum().Sub(1) + ctlMap[e.PodUID].Stop() + } + } + } +} + +func buildCgroupPath(root, uid, qosclass string) string { + return fmt.Sprintf("%s/%s/pod%s", root, strings.ToLower(qosclass), uid) +} + +func init() { + RootCmd.AddCommand(exportCmd) + + exportCmd.Flags().IntVarP(&metricsPort, "port", "p", 9101, "metrics port") + exportCmd.Flags().StringVarP(&kubeconfig, "kubeconfig", "k", "", "path to kubeconfig file. Only required if out-of-cluster.") +} diff --git a/cmd/root.go b/cmd/root.go index 175aee6..e8db6f8 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,9 +1,12 @@ /* Copyright 2017 Kinvolk GmbH + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,8 +23,10 @@ import ( "github.com/spf13/cobra" ) -var version string -var printVersion bool +var ( + version string + printVersion bool +) var RootCmd = &cobra.Command{ Use: "cgnet", diff --git a/cmd/serve.go b/cmd/serve.go deleted file mode 100644 index 5b9df29..0000000 --- a/cmd/serve.go +++ /dev/null @@ -1,86 +0,0 @@ -/* -Copyright 2017 Kinvolk GmbH -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cmd - -import ( - "context" - "fmt" - "os" - "os/signal" - "syscall" - - log "github.com/inconshreveable/log15" - "github.com/spf13/cobra" - - "github.com/kinvolk/cgnet/kube" - "github.com/kinvolk/cgnet/metrics" -) - -var ( - metricsPort int - kubeconfig string -) - -var serveCmd = &cobra.Command{ - Use: "serve", - Short: "Serve metrics to Prometheus", - Run: cmdServe, -} - -func cmdServe(cmd *cobra.Command, args []string) { - ctx, cancelFunc := context.WithCancel(context.Background()) - defer cancelFunc() - - cfg, err := kube.BuildConfig(kubeconfig) - if err != nil { - log.Error("error building config", "err", err) - return - } - - events := make(chan kube.Event) - go kube.WatchPodEvents(ctx, cancelFunc, cfg, events) - - addr := fmt.Sprintf(":%d", metricsPort) - go metrics.Serve(ctx, addr) - - // TODO - // * install bpf program on every 'new pod' event - // * query the bpf maps to retrieve data - // * update podmetrics with data - - term := make(chan os.Signal) - signal.Notify(term, syscall.SIGINT, syscall.SIGTERM) - - for { - select { - case <-term: - return - case <-ctx.Done(): - return - case e := <-events: - switch e { - case kube.NewPodEvent: - metrics.GlobalPodMetrics.TotalNumberPods.Add(1) - case kube.DeletePodEvent: - metrics.GlobalPodMetrics.TotalNumberPods.Sub(1) - } - } - } -} - -func init() { - RootCmd.AddCommand(serveCmd) - serveCmd.Flags().IntVarP(&metricsPort, "port", "p", 9101, "metrics port") - serveCmd.Flags().StringVarP(&kubeconfig, "kubeconfig", "k", "", "path to kubeconfig file. Only required if out-of-cluster.") -} diff --git a/cmd/top.go b/cmd/top.go index 633263e..54d7d71 100644 --- a/cmd/top.go +++ b/cmd/top.go @@ -1,9 +1,12 @@ /* Copyright 2017 Kinvolk GmbH + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,17 +17,21 @@ limitations under the License. package cmd import ( + "context" "fmt" "os" + "os/signal" + "syscall" + "time" "github.com/kinvolk/cgnet/bpf" "github.com/spf13/cobra" ) var topCmd = &cobra.Command{ - Use: "top [CGROUP...]", - Short: "Show network stats per control group", - Run: cmdTop, + Use: "top [CGROUP...]", + Short: "Show network stats per control group", + Run: cmdTop, } func cmdTop(cmd *cobra.Command, args []string) { @@ -33,17 +40,39 @@ func cmdTop(cmd *cobra.Command, args []string) { os.Exit(0) } - if err := bpf.Setup(args[0]); err != nil { + cgPath := args[0] + + ctl, err := bpf.Attach(cgPath) + if err != nil { fmt.Fprintln(os.Stderr, "Unable to load bpf:", err) os.Exit(1) } - quit := make(chan struct{}) - defer close(quit) + var packets, bytes uint64 + ctl.SetPacketsHandler(func(v uint64) error { + packets = v + return nil + }) + ctl.SetBytesHandler(func(v uint64) error { + bytes = v + return nil + }) - if err := bpf.UpdateLoop(quit); err != nil { - fmt.Fprintln(os.Stderr, "Unable to load bpf:", err) - os.Exit(1) + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + go ctl.Run(ctx) + defer ctl.Stop() + + term := make(chan os.Signal) + signal.Notify(term, syscall.SIGINT, syscall.SIGTERM) + + for { + select { + case <-term: + return + case <-time.After(1 * time.Second): + fmt.Fprintf(os.Stdout, "\rcgroup received %d packets (%d bytes)", packets, bytes) + } } } diff --git a/kube/clusterconfig.go b/kube/clusterconfig.go new file mode 100644 index 0000000..a093177 --- /dev/null +++ b/kube/clusterconfig.go @@ -0,0 +1,45 @@ +/* +Copyright 2017 Kinvolk GmbH + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kube + +import ( + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +const DefaultCgroupRoot string = "/sys/fs/cgroup/systemd/kubepods" + +func BuildConfig(kubeconfig string) (*rest.Config, error) { + if kubeconfig != "" { + return clientcmd.BuildConfigFromFlags("", kubeconfig) + } + return rest.InClusterConfig() +} + +// We are using the default root for now. +// It looks like querying of the kubelet config is part of +// this PR for dynamic kubelet configuration: +// +// https://github.com/kubernetes/features/issues/281 +// https://github.com/kubernetes/kubernetes/pull/46254 +// +// TODO: investigate ^ or +// https://github.com/kubernetes/community/blob/master/contributors/design-proposals/dynamic-kubelet-configuration.md#monitoring-configuration-status +// there is some mention of the `configz` endpoint +func GetCgroupRoot(_ *rest.Config) (string, error) { + return DefaultCgroupRoot, nil +} diff --git a/kube/events.go b/kube/events.go index 331d83c..98fd308 100644 --- a/kube/events.go +++ b/kube/events.go @@ -1,59 +1,71 @@ +/* +Copyright 2017 Kinvolk GmbH + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package kube -import ( - log "github.com/inconshreveable/log15" - "k8s.io/client-go/pkg/api/v1" -) +import "k8s.io/client-go/pkg/api/v1" -type Event int +type EventType int const ( - NewPodEvent Event = iota + NewPodEvent EventType = iota UpdatePodEvent DeletePodEvent ) +type Event struct { + Type EventType + PodUID string + PodSelfLink string + PodQOSClass string +} + func onAdd(events chan Event) func(obj interface{}) { - funclog := log.New("func", "onAdd") return func(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { - funclog.Error("unable to assert type") return } - events <- NewPodEvent - funclog.Info(pod.ObjectMeta.SelfLink) + events <- Event{Type: NewPodEvent, PodUID: string(pod.UID), PodSelfLink: pod.SelfLink, PodQOSClass: string(pod.Status.QOSClass)} + return } } func onUpdate(events chan Event) func(oldObj, newObj interface{}) { - // funclog := log.New("func", "onUpdate") return func(oldObj, newObj interface{}) { // do nothing return // oldPod, ok := oldObj.(*v1.Pod) // if !ok { - // funclog.Error("unable to assert type") // return // } // newPod, ok := newObj.(*v1.Pod) // if !ok { - // funclog.Error("unable to assert type") // return // } - // funclog.Info(fmt.Sprintf("old: %s, new: %s", oldPod.ObjectMeta.SelfLink, newPod.ObjectMeta.SelfLink)) } } func onDelete(events chan Event) func(obj interface{}) { - funclog := log.New("func", "onDelete") return func(obj interface{}) { pod, ok := obj.(*v1.Pod) if !ok { - funclog.Error("unable to assert type") return } - events <- DeletePodEvent - funclog.Info(pod.ObjectMeta.SelfLink) + events <- Event{Type: DeletePodEvent, PodUID: string(pod.UID), PodSelfLink: pod.SelfLink, PodQOSClass: string(pod.Status.QOSClass)} + return } } diff --git a/kube/pods.go b/kube/kube.go similarity index 77% rename from kube/pods.go rename to kube/kube.go index d1715a5..c6f67a8 100644 --- a/kube/pods.go +++ b/kube/kube.go @@ -1,9 +1,12 @@ /* Copyright 2017 Kinvolk GmbH + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,15 +20,11 @@ import ( "context" "time" - // "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/pkg/api/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/tools/clientcmd" - "k8s.io/client-go/pkg/api/v1" - - log "github.com/inconshreveable/log15" ) func WatchPodEvents(ctx context.Context, cancelFunc context.CancelFunc, cfg *rest.Config, events chan Event) { @@ -36,13 +35,11 @@ func WatchPodEvents(ctx context.Context, cancelFunc context.CancelFunc, cfg *res } <-ctx.Done() - log.Info("stopped watching events") } func watchCustomResources(ctx context.Context, cfg *rest.Config, events chan Event) (cache.Controller, error) { clientset, err := kubernetes.NewForConfig(cfg) if err != nil { - log.Error("error creating configset", "err", err) return nil, err } @@ -60,15 +57,6 @@ func watchCustomResources(ctx context.Context, cfg *rest.Config, events chan Eve ) go k8sController.Run(ctx.Done()) - log.Info("started watching events", "resource", v1.ResourcePods) return k8sController, nil } - -func BuildConfig(kubeconfig string) (*rest.Config, error) { - if kubeconfig != "" { - return clientcmd.BuildConfigFromFlags("", kubeconfig) - } - log.Warn("assume running inside k8s cluster") - return rest.InClusterConfig() -} diff --git a/manifests/deploy/Makefile b/manifests/deploy/Makefile index 3983e19..4487bb5 100644 --- a/manifests/deploy/Makefile +++ b/manifests/deploy/Makefile @@ -4,11 +4,11 @@ TARGET:=all-in-one.yaml all: $(TARGET) $(TARGET): - @cat cgnet-exporter-ds.yaml > $@ + cat cgnet-exporter-ds.yaml > $@ @echo "---" >> $@ - @cat cgnet-exporter-svc.yaml >> $@ + cat cgnet-exporter-svc.yaml >> $@ @echo "---" >> $@ - @cat cgnet-exporter-svc-monitor.yaml >> $@ + cat cgnet-exporter-svc-monitor.yaml >> $@ clean: $(TARGET) rm -rf $< diff --git a/manifests/deploy/all-in-one.yaml b/manifests/deploy/all-in-one.yaml index a4f74be..bdfcfac 100644 --- a/manifests/deploy/all-in-one.yaml +++ b/manifests/deploy/all-in-one.yaml @@ -20,6 +20,9 @@ spec: - containerPort: 9101 hostPort: 9101 name: scrape + securityContext: + privileged: true + --- apiVersion: v1 kind: Service diff --git a/manifests/deploy/cgnet-exporter-ds.yaml b/manifests/deploy/cgnet-exporter-ds.yaml index 91b50af..5f0656f 100644 --- a/manifests/deploy/cgnet-exporter-ds.yaml +++ b/manifests/deploy/cgnet-exporter-ds.yaml @@ -20,3 +20,5 @@ spec: - containerPort: 9101 hostPort: 9101 name: scrape + securityContext: + privileged: true diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 0000000..e96d9b4 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,52 @@ +/* +Copyright 2017 Kinvolk GmbH + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import ( + "context" + "net/http" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const namespace string = "cgnet_pod" + +func init() { + prometheus.MustRegister(globalPodMetrics.TotalNumberPods) + prometheus.MustRegister(globalPodMetrics.IncomingPackets) + prometheus.MustRegister(globalPodMetrics.OutgoingPackets) +} + +func Serve(ctx context.Context, addr string) { + http.Handle("/metrics", promhttp.Handler()) + srv := http.Server{ + Addr: addr, + Handler: http.DefaultServeMux, + } + go srv.ListenAndServe() + + <-ctx.Done() + + toCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second) + defer cancelFunc() + + if err := srv.Shutdown(toCtx); err != nil { + panic(err) + } +} diff --git a/metrics/prom.go b/metrics/net.go similarity index 59% rename from metrics/prom.go rename to metrics/net.go index 063bfe7..acb8664 100644 --- a/metrics/prom.go +++ b/metrics/net.go @@ -1,9 +1,12 @@ /* Copyright 2017 Kinvolk GmbH + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -13,17 +16,7 @@ limitations under the License. package metrics -import ( - "context" - "net/http" - "time" - - log "github.com/inconshreveable/log15" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" -) - -const namespace string = "cgnet_pod" +import "github.com/prometheus/client_golang/prometheus" type PodMetrics struct { TotalNumberPods prometheus.Gauge @@ -32,7 +25,7 @@ type PodMetrics struct { // ... } -var GlobalPodMetrics = PodMetrics{ +var globalPodMetrics = PodMetrics{ TotalNumberPods: prometheus.NewGauge( prometheus.GaugeOpts{ Name: "total_number_pods", @@ -58,29 +51,14 @@ var GlobalPodMetrics = PodMetrics{ ), } -func init() { - prometheus.MustRegister(GlobalPodMetrics.TotalNumberPods) - prometheus.MustRegister(GlobalPodMetrics.IncomingPackets) - prometheus.MustRegister(GlobalPodMetrics.OutgoingPackets) +func TotalNum() prometheus.Gauge { + return globalPodMetrics.TotalNumberPods } -func Serve(ctx context.Context, addr string) { - http.Handle("/metrics", promhttp.Handler()) - srv := http.Server{ - Addr: addr, - Handler: http.DefaultServeMux, - } - go srv.ListenAndServe() - - log.Info("serving metrics", "addr", addr) - <-ctx.Done() - - toCtx, cancelFunc := context.WithTimeout(ctx, 2*time.Second) - defer cancelFunc() +func SetOutgoingPackets(pod string, packets float64) { + globalPodMetrics.OutgoingPackets.With(prometheus.Labels{"pod_name": pod}).Add(packets) +} - log.Info("waiting for server shutdown") - if err := srv.Shutdown(toCtx); err != nil { - panic(err) - } - log.Info("server stopped") +func SetIncomingPackets(pod string, packets float64) { + globalPodMetrics.IncomingPackets.With(prometheus.Labels{"pod_name": pod}).Add(packets) }