Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics proxy for collecting metrics from mounted processes #78

Merged
merged 8 commits into from
Feb 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#
IMAGE=juicedata/juicefs-csi-driver
REGISTRY=docker.io
VERSION=$(shell git describe --tags --always --dirty)
VERSION=$(shell git describe --tags --match 'v*' --always --dirty)
GIT_BRANCH?=$(shell git rev-parse --abbrev-ref HEAD)
GIT_COMMIT?=$(shell git rev-parse HEAD)
DEV_TAG=dev-$(shell git describe --always --dirty)
Expand Down
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ require (
github.com/kubernetes-csi/csi-test v1.1.0
github.com/onsi/ginkgo v1.7.0
github.com/onsi/gomega v1.4.3
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.17.0
github.com/spf13/afero v1.1.2 // indirect
golang.org/x/net v0.0.0-20190313220215-9f648a60d977 // indirect
google.golang.org/genproto v0.0.0-20181202183823-bd91e49a0898 // indirect
google.golang.org/grpc v1.17.0
gopkg.in/yaml.v2 v2.2.2 // indirect
golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d // indirect
google.golang.org/grpc v1.26.0
google.golang.org/protobuf v1.23.0
k8s.io/apimachinery v0.0.0-20190416092415-3370b4aef5d6 // indirect
k8s.io/klog v0.1.0
k8s.io/kubernetes v1.13.1
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7 // indirect
)

go 1.13
392 changes: 355 additions & 37 deletions go.sum

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pkg/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,16 @@ func (d *controllerService) DeleteVolume(ctx context.Context, req *csi.DeleteVol
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not mount juicefs: %v", err)
}
defer d.juicefs.JfsUnmount(volumeID)

klog.V(5).Infof("DeleteVolume: Deleting volume %q", volumeID)
err = jfs.DeleteVol(volumeID)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not delete volume: %q", volumeID)
}
delete(d.vols, volumeID)
if err = d.juicefs.JfsUnmount(jfs.GetBasePath()); err != nil {
return nil, status.Errorf(codes.Internal, "Could not unmount volume %q: %v", volumeID, err)
}
return &csi.DeleteVolumeResponse{}, nil
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ func newNodeService(nodeID string) nodeService {
}
klog.V(4).Infof("Node: %s", stdoutStderr)

go func() {
klog.V(4).Info("Serve metrics on :9560")
jfsProvider.ServeMetrics(9560)
}()

return nodeService{
juicefs: jfsProvider,
nodeID: nodeID,
Expand Down Expand Up @@ -174,7 +179,7 @@ func (d *nodeService) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
// since the PVC might be used by more than one container
if err == nil && len(refs) == 1 {
klog.V(5).Infof("NodeUnpublishVolume: unmounting ref %s", refs[0])
if err := d.juicefs.Unmount(refs[0]); err != nil {
if err := d.juicefs.JfsUnmount(refs[0]); err != nil {
klog.V(5).Infof("NodeUnpublishVolume: error unmounting mount ref %s, %v", refs[0], err)
}
}
Expand Down
62 changes: 48 additions & 14 deletions pkg/juicefs/juicefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/exec"
"path/filepath"
Expand All @@ -30,14 +31,16 @@ const (
type Interface interface {
mount.Interface
JfsMount(volumeID string, secrets map[string]string, options []string) (Jfs, error)
JfsUnmount(volumeID string)
JfsUnmount(mountPath string) error
AuthFs(secrets map[string]string) ([]byte, error)
MountFs(volumeID, source string, options []string) (string, error)
Version() ([]byte, error)
ServeMetrics(port int)
}

type juicefs struct {
mount.SafeFormatAndMount
metricsProxy
}

var _ Interface = &juicefs{}
Expand Down Expand Up @@ -105,7 +108,7 @@ func NewJfsProvider(mounter *mount.SafeFormatAndMount) (Interface, error) {
}
}

return &juicefs{*mounter}, nil
return &juicefs{*mounter, *newMetricsProxy()}, nil
}

func (j *juicefs) IsNotMountPoint(dir string) (bool, error) {
Expand All @@ -114,30 +117,41 @@ func (j *juicefs) IsNotMountPoint(dir string) (bool, error) {

// JfsMount auths and mounts JuiceFS
func (j *juicefs) JfsMount(volumeID string, secrets map[string]string, options []string) (Jfs, error) {
source, ok := secrets["metaurl"]
if !ok {
source, isCe := secrets["metaurl"]
var mountPath string
if !isCe {
j.Upgrade()
stdoutStderr, err := j.AuthFs(secrets)
klog.V(5).Infof("MountFs: authentication output is '%s'\n", stdoutStderr)
klog.V(5).Infof("JfsMount: authentication output is '%s'\n", stdoutStderr)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not auth juicefs: %v", err)
}
source = secrets["name"]
mountPath, err = j.MountFs(volumeID, source, options)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not mount juicefs: %v", err)
}
} else {
stdoutStderr, err := j.ceFormat(secrets)
klog.V(5).Infof("MountFs: format output is '%s'\n", stdoutStderr)
klog.V(5).Infof("JfsMount: format output is '%s'\n", stdoutStderr)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not format juicefs: %v", err)
}
// Default use redis:// scheme
if !strings.Contains(source, "://") {
source = "redis://" + source
}
}

mountPath, err := j.MountFs(volumeID, source, options)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not mount juicefs: %v", err)
metricsPort, err := getFreePort()
if err != nil {
klog.V(5).Infof("getFreePort error: %q", err)
} else {
options = append(options, fmt.Sprintf("metrics=localhost:%d", metricsPort))
}
mountPath, err = j.MountFs(volumeID, source, options)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not mount juicefs: %v", err)
}
j.ceCheckMetrics(secrets["name"], mountPath, metricsPort)
}

return &jfs{
Expand All @@ -148,12 +162,15 @@ func (j *juicefs) JfsMount(volumeID string, secrets map[string]string, options [
}, nil
}

func (j *juicefs) JfsUnmount(volumeID string) {
mountPath := filepath.Join(mountBase, volumeID)
func (j *juicefs) JfsUnmount(mountPath string) (err error) {
klog.V(5).Infof("JfsUnmount: umount %s", mountPath)
if err := j.Unmount(mountPath); err != nil {
if err = j.Unmount(mountPath); err != nil {
klog.V(5).Infof("JfsUnmount: error umount %s, %v", mountPath, err)
}
j.mpLock.Lock()
delete(j.mountedFs, mountPath)
j.mpLock.Unlock()
return
}

func (j *juicefs) RmrDir(directory string) ([]byte, error) {
Expand Down Expand Up @@ -380,6 +397,7 @@ func (j *juicefs) ceFormat(secrets map[string]string) ([]byte, error) {
func (j *juicefs) ceMount(source string, mountPath string, fsType string, options []string) error {
klog.V(5).Infof("ceMount: mount %v at %v", source, mountPath)
mountArgs := []string{source, mountPath}

if len(options) > 0 {
mountArgs = append(mountArgs, "-o", strings.Join(options, ","))
}
Expand Down Expand Up @@ -427,3 +445,19 @@ func (j *juicefs) ceMount(source string, mountPath string, fsType string, option
}
return status.Errorf(codes.Internal, "Mount %v at %v failed: mount isn't ready in 30 seconds", source, mountPath)
}

func (j *juicefs) ceCheckMetrics(name, mountPath string, metricsPort int) {
j.mpLock.Lock()
defer j.mpLock.Unlock()
j.mountedFs[mountPath] = &mountInfo{
Name: name,
MetricsPort: metricsPort,
}
}

func (j *juicefs) ServeMetrics(port int) {
http.HandleFunc("/metrics", j.serveMetricsHTTP)
if err := http.ListenAndServe(fmt.Sprintf(":%d", port), nil); err != nil {
klog.V(5).Infof("Start metrics server :%d failed: %q", port, err)
}
}
167 changes: 167 additions & 0 deletions pkg/juicefs/metric_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package juicefs

import (
"fmt"
"io"
"k8s.io/klog"
"net"
"net/http"
"sort"
"sync"
"time"

dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"google.golang.org/protobuf/proto"
)

type metricsProxy struct {
client *http.Client
mpLock sync.RWMutex
mountedFs map[string]*mountInfo
}

func newMetricsProxy() *metricsProxy {
return &metricsProxy{
client: &http.Client{
Timeout: 10 * time.Second,
},
mountedFs: make(map[string]*mountInfo),
}
}

type mountInfo struct {
Name string
MetricsPort int
}

func (p *metricsProxy) serveMetricsHTTP(w http.ResponseWriter, req *http.Request) {
wg := new(sync.WaitGroup)
mfsCh := make(chan []*dto.MetricFamily)
mfsResultCh := make(chan []*dto.MetricFamily)

p.mpLock.RLock()
for mp, info := range p.mountedFs {
wg.Add(1)
go func(mp string, mi *mountInfo) {
defer wg.Done()
endpoint := fmt.Sprintf("http://localhost:%d/metrics", mi.MetricsPort)
metricFamilies, err := p.scrape(endpoint)
if err != nil {
klog.V(5).Infof("Scrape metrics from %s error: %q", endpoint, err)
return
}
labels := model.LabelSet{
"vol_name": model.LabelValue(mi.Name),
"mp": model.LabelValue(mp),
}
rewriteMetrics(labels, metricFamilies)
mfsCh <- metricFamilies
}(mp, info)
}
p.mpLock.RUnlock()

go func() {
metricFamilies := make([]*dto.MetricFamily, 0)
for mfs := range mfsCh {
metricFamilies = append(metricFamilies, mfs...)
}
mfsResultCh <- metricFamilies
close(mfsResultCh)
}()

// klog.V(5).Infof("Waiting for scrape to return ...")
wg.Wait()
close(mfsCh)

results := <-mfsResultCh
contentType := expfmt.Negotiate(req.Header)
encoder := expfmt.NewEncoder(w, contentType)
for _, mf := range results {
if err := encoder.Encode(mf); err != nil {
http.Error(w, "An error has occurred during metrics encoding:\n\n"+err.Error(), http.StatusInternalServerError)
return
}
}
}

func (p *metricsProxy) scrape(address string) ([]*dto.MetricFamily, error) { // nolint: lll
req, err := http.NewRequest("GET", address, nil)
if err != nil {
return nil, err
}

resp, err := p.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close() // nolint: errcheck

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned HTTP status %s", resp.Status)
}

return decodeMetrics(resp.Body, expfmt.ResponseFormat(resp.Header))
}

func getFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}

l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}

func decodeMetrics(reader io.Reader, format expfmt.Format) ([]*dto.MetricFamily, error) {
metricFamilies := make([]*dto.MetricFamily, 0)
decoder := expfmt.NewDecoder(reader, format)
var err error
for {
mf := &dto.MetricFamily{}
if err = decoder.Decode(mf); err == nil {
metricFamilies = append(metricFamilies, mf)
} else {
break
}
}
if err == io.EOF {
err = nil
}
return metricFamilies, err
}

// rewriteMetrics adds the given LabelSet to all metrics in the given MetricFamily.
func rewriteMetrics(labels model.LabelSet, metricFamilies []*dto.MetricFamily) {
for _, mf := range metricFamilies {
for _, m := range mf.Metric {
labelSet := make(model.LabelSet, len(m.Label))
for _, lp := range m.Label {
if lp.Name != nil {
labelSet[model.LabelName(*lp.Name)] = model.LabelValue(lp.GetValue())
}
}
mergedLabels := labelSet.Merge(labels)
labelNames := make(model.LabelNames, 0, len(mergedLabels))
for name := range mergedLabels {
labelNames = append(labelNames, name)
}
sort.Sort(labelNames)
labelPairs := make([]*dto.LabelPair, 0, len(mergedLabels))
for _, name := range labelNames {
labelPairs = append(labelPairs, &dto.LabelPair{
// Note: could probably drop the function call and just pass a pointer
Name: proto.String(string(name)),
Value: proto.String(string(mergedLabels[name])),
})
}
m.Label = labelPairs
}
}
}
6 changes: 5 additions & 1 deletion tests/sanity/fake_juicefs_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func (j *fakeJfsProvider) JfsMount(volumeID string, secrets map[string]string, o
return &fs, nil
}

func (j *fakeJfsProvider) JfsUnmount(volumeID string) {}
func (j *fakeJfsProvider) JfsUnmount(mountPath string) error {
return nil
}

func (j *fakeJfsProvider) AuthFs(secrets map[string]string) ([]byte, error) {
return []byte{}, nil
Expand All @@ -66,6 +68,8 @@ func (j *fakeJfsProvider) Version() ([]byte, error) {
return []byte{}, nil
}

func (j *fakeJfsProvider) ServeMetrics(port int) {}

func newFakeJfsProvider() *fakeJfsProvider {
return &fakeJfsProvider{
fs: map[string]fakeJfs{},
Expand Down