From e9dad0d9906f88115e32e0439eb2d073ec83f01d Mon Sep 17 00:00:00 2001 From: Sabari Kumar Murugesan Date: Thu, 8 Nov 2018 00:42:42 -0800 Subject: [PATCH] Address PR feedback --- .../kafka/controller/channel/reconcile.go | 58 +++++++++++-------- .../controller/channel/reconcile_test.go | 11 ++-- .../kafka/controller/reconcile.go | 42 +++----------- pkg/provisioners/kafka/main.go | 1 + 4 files changed, 49 insertions(+), 63 deletions(-) diff --git a/pkg/provisioners/kafka/controller/channel/reconcile.go b/pkg/provisioners/kafka/controller/channel/reconcile.go index 118de57b998..dc5248dfcdf 100644 --- a/pkg/provisioners/kafka/controller/channel/reconcile.go +++ b/pkg/provisioners/kafka/controller/channel/reconcile.go @@ -37,10 +37,13 @@ import ( const ( finalizerName = controllerAgentName - ArgumentNumPartitions = "NumPartitions" - DefaultNumPartitions = 1 + DefaultNumPartitions = 1 ) +type channelArgs struct { + NumPartitions int32 `json:"NumPartitions,omitempty"` +} + // Reconcile compares the actual state with the desired, and attempts to // converge the two. It then updates the Status block of the Channel resource // with the current status of the resource. @@ -50,11 +53,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err channel := &v1alpha1.Channel{} err := r.client.Get(context.TODO(), request.NamespacedName, channel) + // The Channel may have been deleted since it was added to the workqueue. If so, there is + // nothing to be done since the dependent resources would have been deleted as well. if errors.IsNotFound(err) { r.logger.Info("could not find channel", zap.Any("request", request)) return reconcile.Result{}, nil } + // Any other error should be retried in another reconciliation. if err != nil { r.logger.Error("could not fetch channel", zap.Error(err)) return reconcile.Result{}, err @@ -88,9 +94,9 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err err = fmt.Errorf("ClusterChannelProvisioner %s is not ready", clusterChannelProvisioner.Name) } - if err := r.updateChannel(ctx, newChannel); err != nil { - r.logger.Info("failed to update channel status", zap.Error(err)) - return reconcile.Result{}, err + if updateChannelErr := r.updateChannel(ctx, newChannel); updateChannelErr != nil { + r.logger.Info("failed to update channel status", zap.Error(updateChannelErr)) + return reconcile.Result{}, updateChannelErr } // Requeue if the resource is not ready: @@ -98,15 +104,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err } func (r *reconciler) reconcile(channel *v1alpha1.Channel) error { - // See if the channel has been deleted - accessor, err := meta.Accessor(channel) - if err != nil { - r.logger.Info("failed to get metadata", zap.Error(err)) - return err - } + // We don't currently initialize r.kafkaClusterAdmin, hence we end up creating the cluster admin client every time. + // This is because of an issue with Shopify/sarama. See https://github.com/Shopify/sarama/issues/1162. + // Once the issue is fixed we should use a shared cluster admin client. Also, r.kafkaClusterAdmin is currently + // used to pass a fake admin client in the tests. kafkaClusterAdmin := r.kafkaClusterAdmin if kafkaClusterAdmin == nil { + var err error kafkaClusterAdmin, err = createKafkaAdminClient(r.config) if err != nil { r.logger.Fatal("unable to build kafka admin client", zap.Error(err)) @@ -114,6 +119,12 @@ func (r *reconciler) reconcile(channel *v1alpha1.Channel) error { } } + // See if the channel has been deleted + accessor, err := meta.Accessor(channel) + if err != nil { + r.logger.Info("failed to get metadata", zap.Error(err)) + return err + } deletionTimestamp := accessor.GetDeletionTimestamp() if deletionTimestamp != nil { r.logger.Info(fmt.Sprintf("DeletionTimestamp: %v", deletionTimestamp)) @@ -142,26 +153,23 @@ func (r *reconciler) provisionChannel(channel *v1alpha1.Channel, kafkaClusterAdm topicName := topicName(channel) r.logger.Info("creating topic on kafka cluster", zap.String("topic", topicName)) - partitions := DefaultNumPartitions + var arguments channelArgs if channel.Spec.Arguments != nil { var err error - arguments, err := unmarshalArguments(channel.Spec.Arguments.Raw) + arguments, err = unmarshalArguments(channel.Spec.Arguments.Raw) if err != nil { return err } - if num, ok := arguments[ArgumentNumPartitions]; ok { - parsedNum, ok := num.(float64) - if !ok { - return fmt.Errorf("could not parse argument %s for channel %s", ArgumentNumPartitions, fmt.Sprintf("%s/%s", channel.Namespace, channel.Name)) - } - partitions = int(parsedNum) - } + } + + if arguments.NumPartitions == 0 { + arguments.NumPartitions = DefaultNumPartitions } err := kafkaClusterAdmin.CreateTopic(topicName, &sarama.TopicDetail{ ReplicationFactor: 1, - NumPartitions: int32(partitions), + NumPartitions: arguments.NumPartitions, }, false) if err == sarama.ErrTopicAlreadyExists { return nil @@ -246,12 +254,12 @@ func topicName(channel *v1alpha1.Channel) string { return fmt.Sprintf("%s.%s", channel.Namespace, channel.Name) } -// unmarshalArguments unmarshal's a json/yaml serialized input and returns a map structure -func unmarshalArguments(bytes []byte) (map[string]interface{}, error) { - arguments := make(map[string]interface{}) +// unmarshalArguments unmarshal's a json/yaml serialized input and returns channelArgs +func unmarshalArguments(bytes []byte) (channelArgs, error) { + var arguments channelArgs if len(bytes) > 0 { if err := json.Unmarshal(bytes, &arguments); err != nil { - return nil, fmt.Errorf("error unmarshalling arguments: %s", err) + return arguments, fmt.Errorf("error unmarshalling arguments: %s", err) } } return arguments, nil diff --git a/pkg/provisioners/kafka/controller/channel/reconcile_test.go b/pkg/provisioners/kafka/controller/channel/reconcile_test.go index 224d6b335a4..6becf1e1cbe 100644 --- a/pkg/provisioners/kafka/controller/channel/reconcile_test.go +++ b/pkg/provisioners/kafka/controller/channel/reconcile_test.go @@ -43,6 +43,7 @@ const ( channelName = "test-channel" clusterChannelProvisionerName = "kafka-channel" testNS = "test-namespace" + argumentNumPartitions = "NumPartitions" ) var ( @@ -264,8 +265,8 @@ func TestProvisionChannel(t *testing.T) { }, { name: "provision with invalid channel arguments - errors", - c: getNewChannelWithArgs(channelName, map[string]interface{}{ArgumentNumPartitions: "invalid"}), - wantError: fmt.Sprintf("could not parse argument %s for channel test-namespace/test-channel", ArgumentNumPartitions), + c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentNumPartitions: "invalid"}), + wantError: fmt.Sprintf("error unmarshalling arguments: json: cannot unmarshal string into Go struct field channelArgs.%s of type int32", argumentNumPartitions), }, { name: "provision with unmarshallable channel arguments - errors", @@ -280,7 +281,7 @@ func TestProvisionChannel(t *testing.T) { }, { name: "provision with valid channel arguments", - c: getNewChannelWithArgs(channelName, map[string]interface{}{ArgumentNumPartitions: 2}), + c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentNumPartitions: 2}), wantTopicName: fmt.Sprintf("%s.%s", testNS, channelName), wantTopicDetail: &sarama.TopicDetail{ ReplicationFactor: 1, @@ -289,7 +290,7 @@ func TestProvisionChannel(t *testing.T) { }, { name: "provision but topic already exists - no error", - c: getNewChannelWithArgs(channelName, map[string]interface{}{ArgumentNumPartitions: 2}), + c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentNumPartitions: 2}), wantTopicName: fmt.Sprintf("%s.%s", testNS, channelName), wantTopicDetail: &sarama.TopicDetail{ ReplicationFactor: 1, @@ -299,7 +300,7 @@ func TestProvisionChannel(t *testing.T) { }, { name: "provision but error creating topic", - c: getNewChannelWithArgs(channelName, map[string]interface{}{ArgumentNumPartitions: 2}), + c: getNewChannelWithArgs(channelName, map[string]interface{}{argumentNumPartitions: 2}), wantTopicName: fmt.Sprintf("%s.%s", testNS, channelName), wantTopicDetail: &sarama.TopicDetail{ ReplicationFactor: 1, diff --git a/pkg/provisioners/kafka/controller/reconcile.go b/pkg/provisioners/kafka/controller/reconcile.go index 0f72d7fc4a0..dd5d84b47a0 100644 --- a/pkg/provisioners/kafka/controller/reconcile.go +++ b/pkg/provisioners/kafka/controller/reconcile.go @@ -20,13 +20,13 @@ import ( "context" "fmt" - "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "go.uber.org/zap" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + util "github.com/knative/eventing/pkg/provisioners" ) const ( @@ -38,6 +38,7 @@ const ( // converge the two. It then updates the Status block of the Provisioner resource // with the current status of the resource. func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) { + ctx := context.TODO() r.logger.Info("reconciling ClusterChannelProvisioner", zap.Any("request", request)) provisioner := &v1alpha1.ClusterChannelProvisioner{} err := r.client.Get(context.TODO(), request.NamespacedName, provisioner) @@ -58,20 +59,14 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err return reconcile.Result{}, nil } - original := provisioner.DeepCopy() + newProvisioner := provisioner.DeepCopy() // Reconcile this copy of the Provisioner and then write back any status // updates regardless of whether the reconcile error out. - err = r.reconcile(provisioner) - if !equality.Semantic.DeepEqual(original.Status, provisioner.Status) { - // If we didn't change anything then don't call updateStatus. - // This is important because the copy we loaded from the informer's - // cache may be stale and we don't want to overwrite a prior update - // to status with this stale state. - if _, err := r.updateStatus(provisioner); err != nil { - r.logger.Info("failed to update Provisioner status", zap.Error(err)) - return reconcile.Result{}, err - } + err = r.reconcile(newProvisioner) + if updateStatusErr := util.UpdateClusterChannelProvisionerStatus(ctx, r.client, newProvisioner); updateStatusErr != nil { + r.logger.Info("error updating ClusterChannelProvisioner Status", zap.Error(updateStatusErr)) + return reconcile.Result{}, updateStatusErr } // Requeue if the resource is not ready: @@ -97,22 +92,3 @@ func (r *reconciler) reconcile(provisioner *v1alpha1.ClusterChannelProvisioner) return nil } - -func (r *reconciler) updateStatus(provisioner *v1alpha1.ClusterChannelProvisioner) (*v1alpha1.ClusterChannelProvisioner, error) { - newProvisioner := &v1alpha1.ClusterChannelProvisioner{} - err := r.client.Get(context.TODO(), client.ObjectKey{Namespace: provisioner.Namespace, Name: provisioner.Name}, newProvisioner) - - if err != nil { - return nil, err - } - newProvisioner.Status = provisioner.Status - - // Until #38113 is merged, we must use Update instead of UpdateStatus to - // update the Status block of the Provisioner resource. UpdateStatus will not - // allow changes to the Spec of the resource, which is ideal for ensuring - // nothing other than resource status has been updated. - if err = r.client.Update(context.TODO(), newProvisioner); err != nil { - return nil, err - } - return newProvisioner, nil -} diff --git a/pkg/provisioners/kafka/main.go b/pkg/provisioners/kafka/main.go index 6da4e1932fa..65b97003b68 100644 --- a/pkg/provisioners/kafka/main.go +++ b/pkg/provisioners/kafka/main.go @@ -60,6 +60,7 @@ func main() { channel.ProvideController, } + // TODO the underlying config map needs to be watched and the config should be reloaded if there is a change. provisionerConfig, err := getProvisionerConfig() if err != nil {