Skip to content

Commit

Permalink
Add sending events to k8s
Browse files Browse the repository at this point in the history
Signed-off-by: Bala.FA <bala.gluster@gmail.com>
  • Loading branch information
balamurugana authored and wlan0 committed Aug 17, 2021
1 parent ac2fa99 commit 4c38cef
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 29 deletions.
9 changes: 5 additions & 4 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ import (
"github.com/minio/direct-csi/pkg/clientset"
"github.com/minio/direct-csi/pkg/utils"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"k8s.io/klog/v2"
)

/* Volume Lifecycle
Expand Down Expand Up @@ -207,7 +206,7 @@ func (c *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
return nil, err
}

selectedDrive, err := FilterDrivesByTopologyRequirements(req, filteredDrives)
selectedDrive, err := FilterDrivesByTopologyRequirements(req, filteredDrives, c.NodeID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -351,6 +350,8 @@ func (c *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolu
return nil, err
}

utils.Eventf(vol, corev1.EventTypeNormal, "directcsi-controller", "volume %v created", vol.Name)

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: name,
Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (
runtime "k8s.io/apimachinery/pkg/runtime"
)

func init() {
utils.FakeInit()
}

const (
KB = 1 << 10
MB = KB << 10
Expand Down Expand Up @@ -781,7 +785,6 @@ func createFakeController() *ControllerServer {
}

func TestCreateAndDeleteVolumeRPCs(t *testing.T) {

getTopologySegmentsForNode := func(node string) map[string]string {
switch node {
case "N1":
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"crypto/rand"
"fmt"
"math/big"
"sort"

Expand Down Expand Up @@ -133,7 +134,7 @@ func FilterDrivesByAccessTier(accessTier directcsi.AccessTier, csiDrives []direc
}

// FilterDrivesByTopologyRequirements - selects the CSI drive by topology in the create volume request
func FilterDrivesByTopologyRequirements(volReq *csi.CreateVolumeRequest, csiDrives []directcsi.DirectCSIDrive) (directcsi.DirectCSIDrive, error) {
func FilterDrivesByTopologyRequirements(volReq *csi.CreateVolumeRequest, csiDrives []directcsi.DirectCSIDrive, nodeID string) (directcsi.DirectCSIDrive, error) {
tReq := volReq.GetAccessibilityRequirements()

preferredXs := tReq.GetPreferred()
Expand Down Expand Up @@ -163,7 +164,9 @@ func FilterDrivesByTopologyRequirements(volReq *csi.CreateVolumeRequest, csiDriv
"requisiteTopology", requisiteXs,
)

return directcsi.DirectCSIDrive{}, status.Error(codes.ResourceExhausted, "Cannot satisfy the topology constraint")
message := fmt.Sprintf("No suitable drive found on node %v for %v. ", nodeID, volReq.GetName()) +
"Use nodeSelector or affinity to restrict pods to run on node with enough capacity"
return directcsi.DirectCSIDrive{}, status.Error(codes.ResourceExhausted, message)
}

func selectDriveByFreeCapacity(csiDrives []directcsi.DirectCSIDrive) (directcsi.DirectCSIDrive, error) {
Expand Down
46 changes: 46 additions & 0 deletions pkg/utils/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// This file is part of MinIO Direct CSI
// Copyright (c) 2021 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package utils

import (
v1 "k8s.io/api/core/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
)

var (
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
)

func initEvent(kubeClient kubernetes.Interface) {
eventBroadcaster = record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(
&corev1.EventSinkImpl{
Interface: kubeClient.CoreV1().Events(""),
},
)
eventRecorder = eventBroadcaster.NewRecorder(
Scheme, v1.EventSource{Component: "directcsi-controller"},
)
}

func Eventf(object runtime.Object, eventType, reason, messageFmt string, args ...interface{}) {
eventRecorder.Eventf(object, eventType, reason, messageFmt, args...)
}
44 changes: 44 additions & 0 deletions pkg/utils/fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// This file is part of MinIO Direct CSI
// Copyright (c) 2021 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package utils

import (
clientsetfake "github.com/minio/direct-csi/pkg/clientset/fake"
directcsifake "github.com/minio/direct-csi/pkg/clientset/typed/direct.csi.min.io/v1beta2/fake"

apiextensionsv1fake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
discoveryfake "k8s.io/client-go/discovery/fake"
kubernetesfake "k8s.io/client-go/kubernetes/fake"
metadatafake "k8s.io/client-go/metadata/fake"
)

func FakeInit() {
kubeClient = kubernetesfake.NewSimpleClientset()
directCSIClient = &directcsifake.FakeDirectV1beta2{}
directClientset = clientsetfake.NewSimpleClientset()
apiextensionsClient = &apiextensionsv1fake.FakeApiextensionsV1{}
crdClient = apiextensionsClient.CustomResourceDefinitions()
discoveryClient = &discoveryfake.FakeDiscovery{}

scheme := runtime.NewScheme()
metav1.AddMetaToScheme(scheme)
metadataClient = metadatafake.NewSimpleMetadataClient(scheme)

initEvent(kubeClient)
}
2 changes: 2 additions & 0 deletions pkg/utils/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ func Init() {
fmt.Printf("%s: could not initialize metadata client: err=%v\n", Bold("Error"), err)
os.Exit(1)
}

initEvent(kubeClient)
}

func SetDirectCSIClient(fakeClient *directcsifake.FakeDirectV1beta2) {
Expand Down
23 changes: 1 addition & 22 deletions pkg/utils/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,6 @@

package utils

import (
clientsetfake "github.com/minio/direct-csi/pkg/clientset/fake"
directcsifake "github.com/minio/direct-csi/pkg/clientset/typed/direct.csi.min.io/v1beta2/fake"

apiextensionsv1fake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
discoveryfake "k8s.io/client-go/discovery/fake"
kubernetesfake "k8s.io/client-go/kubernetes/fake"
metadatafake "k8s.io/client-go/metadata/fake"
)

func init() {
kubeClient = kubernetesfake.NewSimpleClientset()
directCSIClient = &directcsifake.FakeDirectV1beta2{}
directClientset = clientsetfake.NewSimpleClientset()
apiextensionsClient = &apiextensionsv1fake.FakeApiextensionsV1{}
crdClient = apiextensionsClient.CustomResourceDefinitions()
discoveryClient = &discoveryfake.FakeDiscovery{}

scheme := runtime.NewScheme()
metav1.AddMetaToScheme(scheme)
metadataClient = metadatafake.NewSimpleMetadataClient(scheme)
FakeInit()
}
56 changes: 56 additions & 0 deletions pkg/utils/scheme.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// This file is part of MinIO Direct CSI
// Copyright (c) 2021 MinIO, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package utils

import (
directcsi "github.com/minio/direct-csi/pkg/apis/direct.csi.min.io/v1beta2"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema"
serializer "k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
kubernetesscheme "k8s.io/client-go/kubernetes/scheme"
)

var Scheme = runtime.NewScheme()
var Codecs = serializer.NewCodecFactory(Scheme)
var ParameterCodec = runtime.NewParameterCodec(Scheme)
var localSchemeBuilder = runtime.SchemeBuilder{
kubernetesscheme.AddToScheme,
directcsi.AddToScheme,
}

// AddToScheme adds all types of this clientset into the given scheme. This allows composition
// of clientsets, like in:
//
// import (
// "k8s.io/client-go/kubernetes"
// clientsetscheme "k8s.io/client-go/kubernetes/scheme"
// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme"
// )
//
// kclientset, _ := kubernetes.NewForConfig(c)
// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme)
//
// After this, RawExtensions in Kubernetes types will serialize kube-aggregator types
// correctly.
var AddToScheme = localSchemeBuilder.AddToScheme

func init() {
v1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"})
utilruntime.Must(AddToScheme(Scheme))
}

0 comments on commit 4c38cef

Please sign in to comment.