Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
neosab committed Nov 8, 2018
1 parent 32aa096 commit e9dad0d
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 63 deletions.
58 changes: 33 additions & 25 deletions pkg/provisioners/kafka/controller/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ import (
const (
finalizerName = controllerAgentName

ArgumentNumPartitions = "NumPartitions"
DefaultNumPartitions = 1
DefaultNumPartitions = 1
)

type channelArgs struct {
NumPartitions int32 `json:"NumPartitions,omitempty"`
}

// Reconcile compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Channel resource
// with the current status of the resource.
Expand All @@ -50,11 +53,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
channel := &v1alpha1.Channel{}
err := r.client.Get(context.TODO(), request.NamespacedName, channel)

// The Channel may have been deleted since it was added to the workqueue. If so, there is
// nothing to be done since the dependent resources would have been deleted as well.
if errors.IsNotFound(err) {
r.logger.Info("could not find channel", zap.Any("request", request))
return reconcile.Result{}, nil
}

// Any other error should be retried in another reconciliation.
if err != nil {
r.logger.Error("could not fetch channel", zap.Error(err))
return reconcile.Result{}, err
Expand Down Expand Up @@ -88,32 +94,37 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
err = fmt.Errorf("ClusterChannelProvisioner %s is not ready", clusterChannelProvisioner.Name)
}

if err := r.updateChannel(ctx, newChannel); err != nil {
r.logger.Info("failed to update channel status", zap.Error(err))
return reconcile.Result{}, err
if updateChannelErr := r.updateChannel(ctx, newChannel); updateChannelErr != nil {
r.logger.Info("failed to update channel status", zap.Error(updateChannelErr))
return reconcile.Result{}, updateChannelErr
}

// Requeue if the resource is not ready:
return reconcile.Result{}, err
}

func (r *reconciler) reconcile(channel *v1alpha1.Channel) error {
// See if the channel has been deleted
accessor, err := meta.Accessor(channel)
if err != nil {
r.logger.Info("failed to get metadata", zap.Error(err))
return err
}

// We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time.
// This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162.
// Once the issue is fixed we should use a shared cluster admin client. Also, r.kafkaClusterAdmin is currently
// used to pass a fake admin client in the tests.
kafkaClusterAdmin := r.kafkaClusterAdmin
if kafkaClusterAdmin == nil {
var err error
kafkaClusterAdmin, err = createKafkaAdminClient(r.config)
if err != nil {
r.logger.Fatal("unable to build kafka admin client", zap.Error(err))
return err
}
}

// See if the channel has been deleted
accessor, err := meta.Accessor(channel)
if err != nil {
r.logger.Info("failed to get metadata", zap.Error(err))
return err
}
deletionTimestamp := accessor.GetDeletionTimestamp()
if deletionTimestamp != nil {
r.logger.Info(fmt.Sprintf("DeletionTimestamp: %v", deletionTimestamp))
Expand Down Expand Up @@ -142,26 +153,23 @@ func (r *reconciler) provisionChannel(channel *v1alpha1.Channel, kafkaClusterAdm
topicName := topicName(channel)
r.logger.Info("creating topic on kafka cluster", zap.String("topic", topicName))

partitions := DefaultNumPartitions
var arguments channelArgs

if channel.Spec.Arguments != nil {
var err error
arguments, err := unmarshalArguments(channel.Spec.Arguments.Raw)
arguments, err = unmarshalArguments(channel.Spec.Arguments.Raw)
if err != nil {
return err
}
if num, ok := arguments[ArgumentNumPartitions]; ok {
parsedNum, ok := num.(float64)
if !ok {
return fmt.Errorf("could not parse argument %s for channel %s", ArgumentNumPartitions, fmt.Sprintf("%s/%s", channel.Namespace, channel.Name))
}
partitions = int(parsedNum)
}
}

if arguments.NumPartitions == 0 {
arguments.NumPartitions = DefaultNumPartitions
}

err := kafkaClusterAdmin.CreateTopic(topicName, &sarama.TopicDetail{
ReplicationFactor: 1,
NumPartitions: int32(partitions),
NumPartitions: arguments.NumPartitions,
}, false)
if err == sarama.ErrTopicAlreadyExists {
return nil
Expand Down Expand Up @@ -246,12 +254,12 @@ func topicName(channel *v1alpha1.Channel) string {
return fmt.Sprintf("%s.%s", channel.Namespace, channel.Name)
}

// unmarshalArguments unmarshal's a json/yaml serialized input and returns a map structure
func unmarshalArguments(bytes []byte) (map[string]interface{}, error) {
arguments := make(map[string]interface{})
// unmarshalArguments unmarshal's a json/yaml serialized input and returns channelArgs
func unmarshalArguments(bytes []byte) (channelArgs, error) {
var arguments channelArgs
if len(bytes) > 0 {
if err := json.Unmarshal(bytes, &arguments); err != nil {
return nil, fmt.Errorf("error unmarshalling arguments: %s", err)
return arguments, fmt.Errorf("error unmarshalling arguments: %s", err)
}
}
return arguments, nil
Expand Down
11 changes: 6 additions & 5 deletions pkg/provisioners/kafka/controller/channel/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const (
channelName = "test-channel"
clusterChannelProvisionerName = "kafka-channel"
testNS = "test-namespace"
argumentNumPartitions = "NumPartitions"
)

var (
Expand Down Expand Up @@ -264,8 +265,8 @@ func TestProvisionChannel(t *testing.T) {
},
{
name: "provision with invalid channel arguments - errors",
c: getNewChannelWithArgs(channelName, map[string]interface{}{ArgumentNumPartitions: "invalid"}),
wantError: fmt.Sprintf("could not parse argument %s for channel test-namespace/test-channel", ArgumentNumPartitions),
c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentNumPartitions: "invalid"}),
wantError: fmt.Sprintf("error unmarshalling arguments: json: cannot unmarshal string into Go struct field channelArgs.%s of type int32", argumentNumPartitions),
},
{
name: "provision with unmarshallable channel arguments - errors",
Expand All @@ -280,7 +281,7 @@ func TestProvisionChannel(t *testing.T) {
},
{
name: "provision with valid channel arguments",
c: getNewChannelWithArgs(channelName, map[string]interface{}{ArgumentNumPartitions: 2}),
c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentNumPartitions: 2}),
wantTopicName: fmt.Sprintf("%s.%s", testNS, channelName),
wantTopicDetail: &sarama.TopicDetail{
ReplicationFactor: 1,
Expand All @@ -289,7 +290,7 @@ func TestProvisionChannel(t *testing.T) {
},
{
name: "provision but topic already exists - no error",
c: getNewChannelWithArgs(channelName, map[string]interface{}{ArgumentNumPartitions: 2}),
c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentNumPartitions: 2}),
wantTopicName: fmt.Sprintf("%s.%s", testNS, channelName),
wantTopicDetail: &sarama.TopicDetail{
ReplicationFactor: 1,
Expand All @@ -299,7 +300,7 @@ func TestProvisionChannel(t *testing.T) {
},
{
name: "provision but error creating topic",
c: getNewChannelWithArgs(channelName, map[string]interface{}{ArgumentNumPartitions: 2}),
c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentNumPartitions: 2}),
wantTopicName: fmt.Sprintf("%s.%s", testNS, channelName),
wantTopicDetail: &sarama.TopicDetail{
ReplicationFactor: 1,
Expand Down
42 changes: 9 additions & 33 deletions pkg/provisioners/kafka/controller/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"context"
"fmt"

"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
util "github.com/knative/eventing/pkg/provisioners"
)

const (
Expand All @@ -38,6 +38,7 @@ const (
// converge the two. It then updates the Status block of the Provisioner resource
// with the current status of the resource.
func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
ctx := context.TODO()
r.logger.Info("reconciling ClusterChannelProvisioner", zap.Any("request", request))
provisioner := &v1alpha1.ClusterChannelProvisioner{}
err := r.client.Get(context.TODO(), request.NamespacedName, provisioner)
Expand All @@ -58,20 +59,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
return reconcile.Result{}, nil
}

original := provisioner.DeepCopy()
newProvisioner := provisioner.DeepCopy()

// Reconcile this copy of the Provisioner and then write back any status
// updates regardless of whether the reconcile error out.
err = r.reconcile(provisioner)
if !equality.Semantic.DeepEqual(original.Status, provisioner.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
if _, err := r.updateStatus(provisioner); err != nil {
r.logger.Info("failed to update Provisioner status", zap.Error(err))
return reconcile.Result{}, err
}
err = r.reconcile(newProvisioner)
if updateStatusErr := util.UpdateClusterChannelProvisionerStatus(ctx, r.client, newProvisioner); updateStatusErr != nil {
r.logger.Info("error updating ClusterChannelProvisioner Status", zap.Error(updateStatusErr))
return reconcile.Result{}, updateStatusErr
}

// Requeue if the resource is not ready:
Expand All @@ -97,22 +92,3 @@ func (r *reconciler) reconcile(provisioner *v1alpha1.ClusterChannelProvisioner)

return nil
}

func (r *reconciler) updateStatus(provisioner *v1alpha1.ClusterChannelProvisioner) (*v1alpha1.ClusterChannelProvisioner, error) {
newProvisioner := &v1alpha1.ClusterChannelProvisioner{}
err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: provisioner.Namespace, Name: provisioner.Name}, newProvisioner)

if err != nil {
return nil, err
}
newProvisioner.Status = provisioner.Status

// Until #38113 is merged, we must use Update instead of UpdateStatus to
// update the Status block of the Provisioner resource. UpdateStatus will not
// allow changes to the Spec of the resource, which is ideal for ensuring
// nothing other than resource status has been updated.
if err = r.client.Update(context.TODO(), newProvisioner); err != nil {
return nil, err
}
return newProvisioner, nil
}
1 change: 1 addition & 0 deletions pkg/provisioners/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func main() {
channel.ProvideController,
}

// TODO the underlying config map needs to be watched and the config should be reloaded if there is a change.
provisionerConfig, err := getProvisionerConfig()

if err != nil {
Expand Down

0 comments on commit e9dad0d

Please sign in to comment.