diff --git a/Gopkg.lock b/Gopkg.lock index 18a873d1471..1b91b7f08f2 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -351,7 +351,7 @@ version = "v0.2.0" [[projects]] - digest = "1:9e87cc32eebacc20127e352edabb39625a3f82473b6b1b1b3333627f3cf37f97" + digest = "1:efa519d11e430e97a33f8c5b6bd902876942b1723853d37f0b0b96f5ee33956f" name = "github.com/knative/pkg" packages = [ "apis", @@ -368,6 +368,8 @@ "logging", "logging/logkey", "signals", + "system", + "system/testing", "test", "test/logging", "test/spoof", @@ -375,7 +377,7 @@ "webhook", ] pruneopts = "NUT" - revision = "d3a9e54be7d4213b4018135d82b586de45f6a694" + revision = "ac1f8182274a10bde23df8f964484bce87f06752" [[projects]] digest = "1:9a3d5c1186e9299dc3c3b9f28c3521abdf6abf0834f8ddd169ab005762097378" @@ -1225,6 +1227,8 @@ "github.com/knative/pkg/logging", "github.com/knative/pkg/logging/logkey", "github.com/knative/pkg/signals", + "github.com/knative/pkg/system", + "github.com/knative/pkg/system/testing", "github.com/knative/pkg/test", "github.com/knative/pkg/test/logging", "github.com/knative/pkg/webhook", diff --git a/Gopkg.toml b/Gopkg.toml index 89c912172e3..b6f00627ed3 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -94,8 +94,8 @@ required = [ # TODO why is this overridden? [[override]] name = "github.com/knative/pkg" - # HEAD as of 2019-01-16 - revision = "d3a9e54be7d4213b4018135d82b586de45f6a694" + # HEAD as of 2019-2-11 + revision = "ac1f8182274a10bde23df8f964484bce87f06752" [[constraint]] name = "github.com/knative/serving" diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 736ccab08db..07bd8d137e3 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -33,7 +33,7 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logconfig" - "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/system" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "github.com/knative/pkg/configmap" "github.com/knative/pkg/logging" @@ -92,7 +92,7 @@ func main() { } // Watch the logging config map and dynamically update logging levels. - configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace) + configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace()) configMapWatcher.Watch(logconfig.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Controller, logconfig.Controller)) if err = configMapWatcher.Start(stopCh); err != nil { logger.Fatalf("Failed to start controller config map watcher: %v", err) diff --git a/cmd/fanoutsidecar/main.go b/cmd/fanoutsidecar/main.go index 629babe71e2..2c87d0ff048 100644 --- a/cmd/fanoutsidecar/main.go +++ b/cmd/fanoutsidecar/main.go @@ -31,7 +31,7 @@ import ( "github.com/knative/eventing/pkg/sidecar/configmap/filesystem" "github.com/knative/eventing/pkg/sidecar/configmap/watcher" "github.com/knative/eventing/pkg/sidecar/swappable" - "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/system" "go.uber.org/zap" "golang.org/x/sync/errgroup" "k8s.io/client-go/kubernetes" @@ -61,7 +61,7 @@ var ( func init() { flag.IntVar(&port, "sidecar_port", -1, "The port to run the sidecar on.") flag.StringVar(&configMapNoticer, "config_map_noticer", "", fmt.Sprintf("The system to notice changes to the ConfigMap. Valid values are: %s", configMapNoticerValues())) - flag.StringVar(&configMapNamespace, "config_map_namespace", system.Namespace, "The namespace of the ConfigMap that is watched for configuration.") + flag.StringVar(&configMapNamespace, "config_map_namespace", system.Namespace(), "The namespace of the ConfigMap that is watched for configuration.") flag.StringVar(&configMapName, "config_map_name", defaultConfigMapName, "The name of the ConfigMap that is watched for configuration.") } diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index e35ff3d7304..cac84620e6d 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -31,7 +31,7 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logconfig" - "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/system" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes" @@ -69,7 +69,7 @@ func main() { } // Watch the logging config map and dynamically update logging levels. - configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace) + configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace()) configMapWatcher.Watch(logconfig.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Webhook, logconfig.Webhook)) @@ -86,7 +86,7 @@ func main() { options := webhook.ControllerOptions{ ServiceName: "webhook", DeploymentName: "webhook", - Namespace: system.Namespace, + Namespace: system.Namespace(), Port: 443, SecretName: "webhook-certs", WebhookName: "webhook.eventing.knative.dev", diff --git a/config/500-controller.yaml b/config/500-controller.yaml index e8aec835afe..5fbc2750cd3 100644 --- a/config/500-controller.yaml +++ b/config/500-controller.yaml @@ -37,6 +37,11 @@ spec: volumeMounts: - name: config-logging mountPath: /etc/config-logging + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace volumes: - name: config-logging configMap: diff --git a/config/500-webhook.yaml b/config/500-webhook.yaml index dd7cb154b20..2d5f1877f2a 100644 --- a/config/500-webhook.yaml +++ b/config/500-webhook.yaml @@ -39,6 +39,11 @@ spec: volumeMounts: - name: config-logging mountPath: /etc/config-logging + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace volumes: - name: config-logging configMap: diff --git a/config/provisioners/in-memory-channel/in-memory-channel.yaml b/config/provisioners/in-memory-channel/in-memory-channel.yaml index aa71f95a19e..f80fedf0125 100644 --- a/config/provisioners/in-memory-channel/in-memory-channel.yaml +++ b/config/provisioners/in-memory-channel/in-memory-channel.yaml @@ -147,6 +147,11 @@ spec: containers: - name: controller image: github.com/knative/eventing/pkg/provisioners/inmemory/controller + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace --- @@ -217,3 +222,8 @@ spec: - --config_map_noticer=watcher - --config_map_namespace=knative-eventing - --config_map_name=in-memory-channel-dispatcher-config-map + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace diff --git a/contrib/gcppubsub/config/gcppubsub.yaml b/contrib/gcppubsub/config/gcppubsub.yaml index 7e373b50280..9c6b08f3ff4 100644 --- a/contrib/gcppubsub/config/gcppubsub.yaml +++ b/contrib/gcppubsub/config/gcppubsub.yaml @@ -126,6 +126,11 @@ spec: value: gcppubsub-channel-key - name: DEFAULT_SECRET_KEY value: key.json + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace --- @@ -209,6 +214,20 @@ spec: containers: - name: dispatcher image: github.com/knative/eventing/contrib/gcppubsub/pkg/dispatcher/cmd + env: + - name: DEFAULT_GCP_PROJECT + value: REPLACE_WITH_GCP_PROJECT + - name: DEFAULT_SECRET_NAMESPACE + value: knative-eventing + - name: DEFAULT_SECRET_NAME + value: gcppubsub-channel-key + - name: DEFAULT_SECRET_KEY + value: key.json + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace --- diff --git a/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go b/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go index 03ac497dca0..0ad364609c1 100644 --- a/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go +++ b/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go @@ -37,6 +37,7 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" util "github.com/knative/eventing/pkg/provisioners" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" + _ "github.com/knative/pkg/system/testing" "github.com/knative/eventing/pkg/utils" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" diff --git a/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile_test.go b/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile_test.go index 75fd60cf0eb..08c7b84268b 100644 --- a/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile_test.go +++ b/contrib/gcppubsub/pkg/controller/clusterchannelprovisioner/reconcile_test.go @@ -24,6 +24,7 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" + _ "github.com/knative/pkg/system/testing" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" diff --git a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go index d8078590f02..7ed8937283e 100644 --- a/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go +++ b/contrib/gcppubsub/pkg/dispatcher/dispatcher/reconcile_test.go @@ -44,6 +44,7 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" + _ "github.com/knative/pkg/system/testing" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/contrib/kafka/cmd/dispatcher/main.go b/contrib/kafka/cmd/dispatcher/main.go index 85e907ea657..55026281e67 100644 --- a/contrib/kafka/cmd/dispatcher/main.go +++ b/contrib/kafka/cmd/dispatcher/main.go @@ -31,7 +31,7 @@ import ( provisionerController "github.com/knative/eventing/contrib/kafka/pkg/controller" "github.com/knative/eventing/contrib/kafka/pkg/dispatcher" "github.com/knative/eventing/pkg/sidecar/configmap/watcher" - "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/system" ) func main() { @@ -42,7 +42,7 @@ func main() { } configMapNamespace := os.Getenv("DISPATCHER_CONFIGMAP_NAMESPACE") if configMapNamespace == "" { - configMapNamespace = system.Namespace + configMapNamespace = system.Namespace() } logger, err := zap.NewProduction() diff --git a/contrib/kafka/config/kafka.yaml b/contrib/kafka/config/kafka.yaml index 0814dd81a05..6491ed0f5fa 100644 --- a/contrib/kafka/config/kafka.yaml +++ b/contrib/kafka/config/kafka.yaml @@ -124,6 +124,11 @@ spec: volumeMounts: - name: kafka-channel-controller-config mountPath: /etc/config-provisioner + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace volumes: - name: kafka-channel-controller-config configMap: diff --git a/contrib/kafka/pkg/controller/channel/provider.go b/contrib/kafka/pkg/controller/channel/provider.go index f5389a18e38..7c9d413d246 100644 --- a/contrib/kafka/pkg/controller/channel/provider.go +++ b/contrib/kafka/pkg/controller/channel/provider.go @@ -32,7 +32,7 @@ import ( common "github.com/knative/eventing/contrib/kafka/pkg/controller" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/system" ) const ( @@ -43,7 +43,7 @@ const ( var ( defaultConfigMapKey = types.NamespacedName{ - Namespace: system.Namespace, + Namespace: system.Namespace(), Name: common.DispatcherConfigMapName, } ) diff --git a/contrib/kafka/pkg/controller/channel/provider_test.go b/contrib/kafka/pkg/controller/channel/provider_test.go index 43d7eb88cc2..d75daa96dc5 100644 --- a/contrib/kafka/pkg/controller/channel/provider_test.go +++ b/contrib/kafka/pkg/controller/channel/provider_test.go @@ -19,6 +19,7 @@ package channel import ( "testing" + _ "github.com/knative/pkg/system/testing" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) diff --git a/contrib/kafka/pkg/controller/channel/reconcile_test.go b/contrib/kafka/pkg/controller/channel/reconcile_test.go index 4490bf861c3..1aa466cfdc4 100644 --- a/contrib/kafka/pkg/controller/channel/reconcile_test.go +++ b/contrib/kafka/pkg/controller/channel/reconcile_test.go @@ -31,6 +31,7 @@ import ( "github.com/knative/eventing/pkg/provisioners" util "github.com/knative/eventing/pkg/provisioners" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" + _ "github.com/knative/pkg/system/testing" "github.com/knative/eventing/pkg/utils" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" @@ -538,7 +539,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService { }, Route: []istiov1alpha3.DestinationWeight{{ Destination: istiov1alpha3.Destination{ - Host: "kafka-provisioner.knative-eventing.svc." + utils.GetClusterDomainName(), + Host: "kafka-provisioner.knative-testing.svc." + utils.GetClusterDomainName(), Port: istiov1alpha3.PortSelector{ Number: util.PortNumber, }, diff --git a/contrib/kafka/pkg/controller/provider_test.go b/contrib/kafka/pkg/controller/provider_test.go index 9b648acba67..22cc93f5c2b 100644 --- a/contrib/kafka/pkg/controller/provider_test.go +++ b/contrib/kafka/pkg/controller/provider_test.go @@ -19,6 +19,7 @@ package controller import ( "testing" + _ "github.com/knative/pkg/system/testing" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) diff --git a/contrib/kafka/pkg/controller/reconcile_test.go b/contrib/kafka/pkg/controller/reconcile_test.go index e3eb8eb28f9..6e69b6aa57a 100644 --- a/contrib/kafka/pkg/controller/reconcile_test.go +++ b/contrib/kafka/pkg/controller/reconcile_test.go @@ -25,6 +25,7 @@ import ( "github.com/knative/eventing/pkg/provisioners" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + _ "github.com/knative/pkg/system/testing" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" diff --git a/contrib/kafka/pkg/controller/util_test.go b/contrib/kafka/pkg/controller/util_test.go index dfbd8d48f1f..66867498d3a 100644 --- a/contrib/kafka/pkg/controller/util_test.go +++ b/contrib/kafka/pkg/controller/util_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + _ "github.com/knative/pkg/system/testing" ) func TestGetProvisionerConfigBrokers(t *testing.T) { diff --git a/contrib/kafka/pkg/dispatcher/dispatcher_test.go b/contrib/kafka/pkg/dispatcher/dispatcher_test.go index f8d51a93b5a..4d41c9be486 100644 --- a/contrib/kafka/pkg/dispatcher/dispatcher_test.go +++ b/contrib/kafka/pkg/dispatcher/dispatcher_test.go @@ -18,6 +18,7 @@ import ( "github.com/knative/eventing/pkg/provisioners" "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + _ "github.com/knative/pkg/system/testing" ) type mockConsumer struct { diff --git a/contrib/natss/config/provisioner.yaml b/contrib/natss/config/provisioner.yaml index f2745d7521a..d1de4bf4f24 100644 --- a/contrib/natss/config/provisioner.yaml +++ b/contrib/natss/config/provisioner.yaml @@ -103,6 +103,11 @@ spec: containers: - name: controller image: github.com/knative/eventing/contrib/natss/pkg/controller + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace --- @@ -175,3 +180,8 @@ spec: containers: - name: dispatcher image: github.com/knative/eventing/contrib/natss/pkg/dispatcher + env: + - name: SYSTEM_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace diff --git a/contrib/natss/pkg/controller/channel/reconcile_test.go b/contrib/natss/pkg/controller/channel/reconcile_test.go index 08ece0796d1..322457dde06 100644 --- a/contrib/natss/pkg/controller/channel/reconcile_test.go +++ b/contrib/natss/pkg/controller/channel/reconcile_test.go @@ -24,6 +24,7 @@ import ( "github.com/knative/eventing/pkg/provisioners" util "github.com/knative/eventing/pkg/provisioners" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" + _ "github.com/knative/pkg/system/testing" "github.com/knative/eventing/pkg/utils" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" diff --git a/contrib/natss/pkg/controller/clusterchannelprovisioner/reconcile_test.go b/contrib/natss/pkg/controller/clusterchannelprovisioner/reconcile_test.go index 64549c2fbfc..d0c1e14ec97 100644 --- a/contrib/natss/pkg/controller/clusterchannelprovisioner/reconcile_test.go +++ b/contrib/natss/pkg/controller/clusterchannelprovisioner/reconcile_test.go @@ -24,8 +24,9 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" - "github.com/knative/eventing/pkg/system" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" + "github.com/knative/pkg/system" + _ "github.com/knative/pkg/system/testing" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -236,7 +237,7 @@ func makeK8sService() *corev1.Service { Kind: "Service", }, ObjectMeta: metav1.ObjectMeta{ - Namespace: system.Namespace, + Namespace: system.Namespace(), Name: fmt.Sprintf("%s-dispatcher", Name), OwnerReferences: []metav1.OwnerReference{ { diff --git a/contrib/natss/pkg/dispatcher/dispatcher/dispatcher_test.go b/contrib/natss/pkg/dispatcher/dispatcher/dispatcher_test.go index bde83384f07..22ba61e94d8 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher/dispatcher_test.go +++ b/contrib/natss/pkg/dispatcher/dispatcher/dispatcher_test.go @@ -26,6 +26,7 @@ import ( "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" + _ "github.com/knative/pkg/system/testing" "github.com/nats-io/nats-streaming-server/server" "go.uber.org/zap" "go.uber.org/zap/zapcore" diff --git a/contrib/natss/pkg/stanutil/stanutil_test.go b/contrib/natss/pkg/stanutil/stanutil_test.go index 518c0ac9700..eaffad0e8cd 100644 --- a/contrib/natss/pkg/stanutil/stanutil_test.go +++ b/contrib/natss/pkg/stanutil/stanutil_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/knative/eventing/pkg/provisioners" + _ "github.com/knative/pkg/system/testing" "github.com/nats-io/nats-streaming-server/server" "go.uber.org/zap" ) diff --git a/pkg/provisioners/channel_util.go b/pkg/provisioners/channel_util.go index a9584a70e04..98f0b0065f3 100644 --- a/pkg/provisioners/channel_util.go +++ b/pkg/provisioners/channel_util.go @@ -17,7 +17,7 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/reconciler/names" - "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/system" "github.com/knative/eventing/pkg/utils" "k8s.io/apimachinery/pkg/api/equality" ) @@ -315,7 +315,7 @@ func virtualOldServiceLabels(c *eventingv1alpha1.Channel) map[string]string { // appropriate OwnerReferences on the resource so handleObject can discover the Channel resource // that 'owns' it. As well as being garbage collected when the Channel is deleted. func newVirtualService(channel *eventingv1alpha1.Channel, svc *corev1.Service) *istiov1alpha3.VirtualService { - destinationHost := names.ServiceHostName(channelDispatcherServiceName(channel.Spec.Provisioner.Name), system.Namespace) + destinationHost := names.ServiceHostName(channelDispatcherServiceName(channel.Spec.Provisioner.Name), system.Namespace()) return &istiov1alpha3.VirtualService{ ObjectMeta: metav1.ObjectMeta{ GenerateName: channelVirtualServiceName(channel.Name), diff --git a/pkg/provisioners/channel_util_test.go b/pkg/provisioners/channel_util_test.go index d490655c274..b909ae3d032 100644 --- a/pkg/provisioners/channel_util_test.go +++ b/pkg/provisioners/channel_util_test.go @@ -24,6 +24,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + _ "github.com/knative/pkg/system/testing" "github.com/knative/eventing/pkg/utils" ) @@ -647,7 +648,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService { }, Route: []istiov1alpha3.DestinationWeight{{ Destination: istiov1alpha3.Destination{ - Host: fmt.Sprintf("%s-dispatcher.knative-eventing.svc.%s", clusterChannelProvisionerName, utils.GetClusterDomainName()), + Host: fmt.Sprintf("%s-dispatcher.knative-testing.svc.%s", clusterChannelProvisionerName, utils.GetClusterDomainName()), Port: istiov1alpha3.PortSelector{ Number: PortNumber, }, diff --git a/pkg/provisioners/inmemory/channel/controller.go b/pkg/provisioners/inmemory/channel/controller.go index b139d622f2a..1b03cca5fac 100644 --- a/pkg/provisioners/inmemory/channel/controller.go +++ b/pkg/provisioners/inmemory/channel/controller.go @@ -18,7 +18,7 @@ package channel import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/system" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" @@ -42,7 +42,7 @@ const ( var ( defaultConfigMapKey = types.NamespacedName{ - Namespace: system.Namespace, + Namespace: system.Namespace(), Name: ConfigMapName, } ) diff --git a/pkg/provisioners/inmemory/channel/reconcile_test.go b/pkg/provisioners/inmemory/channel/reconcile_test.go index 4097e7e1c42..c12a96c3e85 100644 --- a/pkg/provisioners/inmemory/channel/reconcile_test.go +++ b/pkg/provisioners/inmemory/channel/reconcile_test.go @@ -31,6 +31,7 @@ import ( "github.com/knative/eventing/pkg/sidecar/configmap" "github.com/knative/eventing/pkg/sidecar/fanout" "github.com/knative/eventing/pkg/sidecar/multichannelfanout" + _ "github.com/knative/pkg/system/testing" "github.com/knative/eventing/pkg/utils" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" @@ -749,7 +750,7 @@ func makeVirtualService() *istiov1alpha3.VirtualService { }, Route: []istiov1alpha3.DestinationWeight{{ Destination: istiov1alpha3.Destination{ - Host: "in-memory-channel-dispatcher.knative-eventing.svc." + utils.GetClusterDomainName(), + Host: "in-memory-channel-dispatcher.knative-testing.svc." + utils.GetClusterDomainName(), Port: istiov1alpha3.PortSelector{ Number: util.PortNumber, }, diff --git a/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile.go b/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile.go index 04e19ca8933..5e79fc3c802 100644 --- a/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile.go +++ b/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile.go @@ -31,7 +31,7 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" util "github.com/knative/eventing/pkg/provisioners" - "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/system" ) const ( @@ -182,7 +182,7 @@ func (r *reconciler) reconcile(ctx context.Context, ccp *eventingv1alpha1.Cluste func (r *reconciler) deleteOldDispatcherService(ctx context.Context, ccp *eventingv1alpha1.ClusterChannelProvisioner) error { svcName := fmt.Sprintf("%s-clusterbus", ccp.Name) svcKey := types.NamespacedName{ - Namespace: system.Namespace, + Namespace: system.Namespace(), Name: svcName, } svc := &corev1.Service{} diff --git a/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile_test.go b/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile_test.go index 8359d8388e4..982d482e3de 100644 --- a/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile_test.go +++ b/pkg/provisioners/inmemory/clusterchannelprovisioner/reconcile_test.go @@ -35,7 +35,8 @@ import ( eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" util "github.com/knative/eventing/pkg/provisioners" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" - "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/system" + _ "github.com/knative/pkg/system/testing" ) const ( @@ -340,7 +341,7 @@ func makeK8sService() *corev1.Service { Kind: "Service", }, ObjectMeta: metav1.ObjectMeta{ - Namespace: system.Namespace, + Namespace: system.Namespace(), Name: fmt.Sprintf("%s-dispatcher", Name), OwnerReferences: []metav1.OwnerReference{ { diff --git a/pkg/provisioners/logger_test.go b/pkg/provisioners/logger_test.go index caa43c23160..d1c7d811d7a 100644 --- a/pkg/provisioners/logger_test.go +++ b/pkg/provisioners/logger_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + _ "github.com/knative/pkg/system/testing" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) diff --git a/pkg/provisioners/message_dispatcher_test.go b/pkg/provisioners/message_dispatcher_test.go index ac02f645ad4..d2f14ccc376 100644 --- a/pkg/provisioners/message_dispatcher_test.go +++ b/pkg/provisioners/message_dispatcher_test.go @@ -26,6 +26,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + _ "github.com/knative/pkg/system/testing" "go.uber.org/zap" ) diff --git a/pkg/provisioners/message_receiver_test.go b/pkg/provisioners/message_receiver_test.go index a5ca3839113..6411cc7f65e 100644 --- a/pkg/provisioners/message_receiver_test.go +++ b/pkg/provisioners/message_receiver_test.go @@ -26,6 +26,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + _ "github.com/knative/pkg/system/testing" "github.com/knative/eventing/pkg/utils" "go.uber.org/zap" diff --git a/pkg/provisioners/provisioner_util.go b/pkg/provisioners/provisioner_util.go index c6a0f84b045..131176fd1d9 100644 --- a/pkg/provisioners/provisioner_util.go +++ b/pkg/provisioners/provisioner_util.go @@ -15,13 +15,13 @@ import ( "fmt" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/system" "github.com/knative/pkg/logging" ) func CreateDispatcherService(ctx context.Context, client runtimeClient.Client, ccp *eventingv1alpha1.ClusterChannelProvisioner) (*corev1.Service, error) { svcKey := types.NamespacedName{ - Namespace: system.Namespace, + Namespace: system.Namespace(), Name: channelDispatcherServiceName(ccp.Name), } getSvc := func() (*corev1.Service, error) { @@ -55,7 +55,7 @@ func newDispatcherService(ccp *eventingv1alpha1.ClusterChannelProvisioner) *core return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: channelDispatcherServiceName(ccp.Name), - Namespace: system.Namespace, + Namespace: system.Namespace(), Labels: labels, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(ccp, schema.GroupVersionKind{ diff --git a/pkg/provisioners/provisioner_util_test.go b/pkg/provisioners/provisioner_util_test.go index 7104661f367..cdf2eb724e6 100644 --- a/pkg/provisioners/provisioner_util_test.go +++ b/pkg/provisioners/provisioner_util_test.go @@ -17,7 +17,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "github.com/knative/eventing/pkg/system" + "github.com/knative/pkg/system" + _ "github.com/knative/pkg/system/testing" ) const ( @@ -60,7 +61,7 @@ func TestProvisionerUtils(t *testing.T) { CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) got := &corev1.Service{} - err := client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: system.Namespace, Name: fmt.Sprintf("%s-dispatcher", clusterChannelProvisionerName)}, got) + err := client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: system.Namespace(), Name: fmt.Sprintf("%s-dispatcher", clusterChannelProvisionerName)}, got) return got, err }, want: makeDispatcherService(), @@ -73,7 +74,7 @@ func TestProvisionerUtils(t *testing.T) { CreateDispatcherService(context.TODO(), client, getNewClusterChannelProvisioner()) got := &corev1.Service{} - err := client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: system.Namespace, Name: fmt.Sprintf("%s-dispatcher", clusterChannelProvisionerName)}, got) + err := client.Get(context.TODO(), runtimeClient.ObjectKey{Namespace: system.Namespace(), Name: fmt.Sprintf("%s-dispatcher", clusterChannelProvisionerName)}, got) return got, err }, want: func() metav1.Object { @@ -161,7 +162,7 @@ func ClusterChannelProvisionerType() metav1.TypeMeta { func makeDispatcherService() *corev1.Service { return &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ - Namespace: system.Namespace, + Namespace: system.Namespace(), Name: fmt.Sprintf("%s-dispatcher", clusterChannelProvisionerName), OwnerReferences: []metav1.OwnerReference{ { diff --git a/pkg/provisioners/references_test.go b/pkg/provisioners/references_test.go index 04e2b4b9aac..69920eea0f3 100644 --- a/pkg/provisioners/references_test.go +++ b/pkg/provisioners/references_test.go @@ -19,6 +19,8 @@ package provisioners import ( "fmt" "testing" + + _ "github.com/knative/pkg/system/testing" ) const ( diff --git a/test/test_images/logevents/main.go b/test/test_images/logevents/main.go index f40dc2c3fb9..6f91351c04b 100644 --- a/test/test_images/logevents/main.go +++ b/test/test_images/logevents/main.go @@ -28,8 +28,8 @@ type Heartbeat struct { } func handler(ctx context.Context, data map[string]interface{}) { - metadata := cloudevents.FromContext(ctx) - log.Printf("[%s] %s %s: %+v", metadata.EventTime.Format(time.RFC3339), metadata.ContentType, metadata.Source, data) + metadata := cloudevents.FromContext(ctx).AsV02() + log.Printf("[%s] %s %s: %+v", metadata.Time.Format(time.RFC3339), metadata.ContentType, metadata.Source, data) } func main() { diff --git a/vendor/github.com/knative/pkg/apis/duck/v1alpha1/condition_set.go b/vendor/github.com/knative/pkg/apis/duck/v1alpha1/condition_set.go index 954aead9736..1c550ed19ff 100644 --- a/vendor/github.com/knative/pkg/apis/duck/v1alpha1/condition_set.go +++ b/vendor/github.com/knative/pkg/apis/duck/v1alpha1/condition_set.go @@ -280,7 +280,7 @@ func (r conditionsImpl) MarkUnknown(t ConditionType, reason, messageFormat strin // Double check that the happy condition is also false. happy := r.GetCondition(r.happy) if !happy.IsFalse() { - r.MarkFalse(r.happy, reason, messageFormat, messageA) + r.MarkFalse(r.happy, reason, messageFormat, messageA...) } return } @@ -324,9 +324,10 @@ func (r conditionsImpl) MarkFalse(t ConditionType, reason, messageFormat string, // InitializeConditions updates all Conditions in the ConditionSet to Unknown // if not set. func (r conditionsImpl) InitializeConditions() { - for _, t := range append(r.dependents, r.happy) { + for _, t := range r.dependents { r.InitializeCondition(t) } + r.InitializeCondition(r.happy) } // InitializeCondition updates a Condition to Unknown if not set. diff --git a/vendor/github.com/knative/pkg/apis/duck/v1alpha1/conditions_types.go b/vendor/github.com/knative/pkg/apis/duck/v1alpha1/conditions_types.go index 0dbad3397e0..b2fac23d6f1 100644 --- a/vendor/github.com/knative/pkg/apis/duck/v1alpha1/conditions_types.go +++ b/vendor/github.com/knative/pkg/apis/duck/v1alpha1/conditions_types.go @@ -47,8 +47,10 @@ type ConditionSeverity string const ( // ConditionSeverityError specifies that a failure of a condition type - // should be viewed as an error. - ConditionSeverityError ConditionSeverity = "Error" + // should be viewed as an error. As "Error" is the default for conditions + // we use the empty string (coupled with omitempty) to avoid confusion in + // the case where the condition is in state "True" (aka nothing is wrong). + ConditionSeverityError ConditionSeverity = "" // ConditionSeverityWarning specifies that a failure of a condition type // should be viewed as a warning, but that things could still work. ConditionSeverityWarning ConditionSeverity = "Warning" diff --git a/vendor/github.com/knative/pkg/apis/field_error.go b/vendor/github.com/knative/pkg/apis/field_error.go index 4ae02bc1ed1..e3c3ae4e83b 100644 --- a/vendor/github.com/knative/pkg/apis/field_error.go +++ b/vendor/github.com/knative/pkg/apis/field_error.go @@ -192,7 +192,7 @@ func asKey(key string) string { // err([0]).ViaField(bar).ViaField(foo) -> foo.bar.[0] converts to foo.bar[0] // err(bar).ViaIndex(0).ViaField(foo) -> foo.[0].bar converts to foo[0].bar // err(bar).ViaField(foo).ViaIndex(0) -> [0].foo.bar converts to [0].foo.bar -// err(bar).ViaIndex(0).ViaIndex[1].ViaField(foo) -> foo.[1].[0].bar converts to foo[1][0].bar +// err(bar).ViaIndex(0).ViaIndex(1).ViaField(foo) -> foo.[1].[0].bar converts to foo[1][0].bar func flatten(path []string) string { var newPath []string for _, part := range path { @@ -300,6 +300,12 @@ func ErrDisallowedFields(fieldPaths ...string) *FieldError { } } +// ErrInvalidArrayValue consturcts a FieldError for a repetetive `field` +// at `index` that has received an invalid string value. +func ErrInvalidArrayValue(value, field string, index int) *FieldError { + return ErrInvalidValue(value, CurrentField).ViaFieldIndex(field, index) +} + // ErrInvalidValue constructs a FieldError for a field that has received an // invalid string value. func ErrInvalidValue(value, fieldPath string) *FieldError { diff --git a/vendor/github.com/knative/pkg/apis/istio/common/v1alpha1/string.go b/vendor/github.com/knative/pkg/apis/istio/common/v1alpha1/string.go index 52336124d4a..c34c2505317 100644 --- a/vendor/github.com/knative/pkg/apis/istio/common/v1alpha1/string.go +++ b/vendor/github.com/knative/pkg/apis/istio/common/v1alpha1/string.go @@ -28,7 +28,7 @@ type StringMatch struct { Prefix string `json:"prefix,omitempty"` // suffix-based match. - Suffix string `json:"prefix,omitempty"` + Suffix string `json:"suffix,omitempty"` // ECMAscript style regex-based match Regex string `json:"regex,omitempty"` diff --git a/vendor/github.com/knative/pkg/apis/istio/v1alpha3/destinationrule_types.go b/vendor/github.com/knative/pkg/apis/istio/v1alpha3/destinationrule_types.go index eff821123da..d5b0d405070 100644 --- a/vendor/github.com/knative/pkg/apis/istio/v1alpha3/destinationrule_types.go +++ b/vendor/github.com/knative/pkg/apis/istio/v1alpha3/destinationrule_types.go @@ -206,7 +206,7 @@ type PortTrafficPolicy struct { type Subset struct { // REQUIRED. Name of the subset. The service name and the subset name can // be used for traffic splitting in a route rule. - Name string `json:"port"` + Name string `json:"name"` // REQUIRED. Labels apply a filter over the endpoints of a service in the // service registry. See route rules for examples of usage. diff --git a/vendor/github.com/knative/pkg/cloudevents/builder.go b/vendor/github.com/knative/pkg/cloudevents/builder.go new file mode 100644 index 00000000000..4cf3706d964 --- /dev/null +++ b/vendor/github.com/knative/pkg/cloudevents/builder.go @@ -0,0 +1,135 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudevents + +import ( + "fmt" + "net/http" + "time" + + "github.com/google/uuid" +) + +// CloudEventEncoding is used to tell the builder which encoding to select. +// the default is Binary. +type CloudEventEncoding int + +const ( + // Binary v0.1 + BinaryV01 CloudEventEncoding = iota + // Structured v0.1 + StructuredV01 +) + +// Builder holds settings that do not change over CloudEvents. It is intended +// to represent a builder of only a single CloudEvent type. +type Builder struct { + // A URI describing the event producer. + Source string + // Type of occurrence which has happened. + EventType string + // The version of the `eventType`; this is producer-specific. + EventTypeVersion string + // A link to the schema that the `data` attribute adheres to. + SchemaURL string + // Additional metadata without a well-defined structure. + Extensions map[string]interface{} + + // Encoding specifies the requested output encoding of the CloudEvent. + Encoding CloudEventEncoding +} + +// Build produces a http request with the constant data embedded in the builder +// merged with the new data provided in the build function. The request will +// send a pre-assembled cloud event to the given target. The target is assumed +// to be a URL with a scheme, ie: "http://localhost:8080" +func (b *Builder) Build(target string, data interface{}, overrides ...SendContext) (*http.Request, error) { + if len(overrides) > 1 { + return nil, fmt.Errorf("Build was called with more than one override") + } + + var overridesV01 *V01EventContext + if len(overrides) == 1 { + switch t := overrides[0].(type) { + case V01EventContext: + o := overrides[0].(V01EventContext) + overridesV01 = &o + default: + return nil, fmt.Errorf("Build was called with unknown override type %v", t) + } + } + // TODO: when V02 is supported this will have to shuffle a little. + ctx := b.cloudEventsContextV01(overridesV01) + + if ctx.Source == "" { + return nil, fmt.Errorf("ctx.Source resolved empty") + } + if ctx.EventType == "" { + return nil, fmt.Errorf("ctx.EventType resolved empty") + } + + switch b.Encoding { + case BinaryV01: + return Binary.NewRequest(target, data, ctx) + case StructuredV01: + return Structured.NewRequest(target, data, ctx) + default: + return nil, fmt.Errorf("unsupported encoding: %v", b.Encoding) + } +} + +// cloudEventsContext creates a CloudEvent context object, assumes +// application/json as the content type. +func (b *Builder) cloudEventsContextV01(overrides *V01EventContext) V01EventContext { + ctx := V01EventContext{ + CloudEventsVersion: CloudEventsVersion, + EventType: b.EventType, + EventID: uuid.New().String(), + EventTypeVersion: b.EventTypeVersion, + SchemaURL: b.SchemaURL, + Source: b.Source, + ContentType: "application/json", + EventTime: time.Now(), + Extensions: b.Extensions, + } + if overrides != nil { + if overrides.Source != "" { + ctx.Source = overrides.Source + } + if overrides.EventID != "" { + ctx.EventID = overrides.EventID + } + if overrides.EventType != "" { + ctx.EventType = overrides.EventType + } + if !overrides.EventTime.IsZero() { + ctx.EventTime = overrides.EventTime + } + if overrides.ContentType != "" { + ctx.ContentType = overrides.ContentType + } + if len(overrides.Extensions) > 0 { + if ctx.Extensions == nil { + ctx.Extensions = make(map[string]interface{}) + } + for k, v := range overrides.Extensions { + ctx.Extensions[k] = v + } + } + } + return ctx +} diff --git a/vendor/github.com/knative/pkg/cloudevents/client.go b/vendor/github.com/knative/pkg/cloudevents/client.go new file mode 100644 index 00000000000..9846edf1da3 --- /dev/null +++ b/vendor/github.com/knative/pkg/cloudevents/client.go @@ -0,0 +1,81 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudevents + +import ( + "fmt" + "io/ioutil" + "net/http" +) + +// Client wraps Builder, and is intended to be configured for a single event +// type and target +type Client struct { + builder Builder + Target string +} + +// NewClient returns a CloudEvent Client used to send CloudEvents. It is +// intended that a user would create a new client for each tuple of eventType +// and target. This is an optional helper method to avoid the tricky creation +// of the embedded Builder struct. +func NewClient(target string, builder Builder) *Client { + c := &Client{ + builder: builder, + Target: target, + } + return c +} + +// Send creates a request based on the client's settings and sends the data +// struct to the target set for this client. It returns error if there was an +// issue sending the event, otherwise nil means the event was accepted. +func (c *Client) Send(data interface{}, overrides ...SendContext) error { + req, err := c.builder.Build(c.Target, data, overrides...) + if err != nil { + return err + } + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + if accepted(resp) { + return nil + } + return fmt.Errorf("error sending cloudevent: %s", status(resp)) +} + +// accepted is a helper method to understand if the response from the target +// accepted the CloudEvent. +func accepted(resp *http.Response) bool { + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + return true + } + return false +} + +// status is a helper method to read the response of the target. +func status(resp *http.Response) string { + status := resp.Status + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return fmt.Sprintf("Status[%s] error reading response body: %v", status, err) + } + return fmt.Sprintf("Status[%s] %s", status, body) +} diff --git a/vendor/github.com/knative/pkg/cloudevents/doc.go b/vendor/github.com/knative/pkg/cloudevents/doc.go index d33753866f8..62bc3b02ce1 100644 --- a/vendor/github.com/knative/pkg/cloudevents/doc.go +++ b/vendor/github.com/knative/pkg/cloudevents/doc.go @@ -19,5 +19,4 @@ limitations under the License. // https://github.com/cloudevents/spec/blob/v0.1/http-transport-binding.md // and // https://github.com/cloudevents/spec/blob/v0.1/spec.md - package cloudevents diff --git a/vendor/github.com/knative/pkg/cloudevents/encoding_binary.go b/vendor/github.com/knative/pkg/cloudevents/encoding_binary.go index 9b20dfe500a..8ed4fbfaad1 100644 --- a/vendor/github.com/knative/pkg/cloudevents/encoding_binary.go +++ b/vendor/github.com/knative/pkg/cloudevents/encoding_binary.go @@ -20,14 +20,10 @@ package cloudevents import ( "bytes" - "encoding/json" "fmt" "io/ioutil" - "log" "net/http" "net/url" - "strings" - "time" ) const ( @@ -56,7 +52,7 @@ const ( // HeaderSource is the header for the source which emitted this event. HeaderSource = "CE-Source" - // HeaderExtensions is the OPTIONAL header prefix for CloudEvents extensions + // HeaderExtensionsPrefix is the OPTIONAL header prefix for CloudEvents extensions HeaderExtensionsPrefix = "CE-X-" // Binary implements Binary encoding/decoding @@ -65,68 +61,57 @@ const ( type binary int -// FromRequest parses event data and context from an HTTP request. -func (binary) FromRequest(data interface{}, r *http.Request) (*EventContext, error) { - var ctx EventContext - err := anyError( - getRequiredHeader(r.Header, HeaderEventID, &ctx.EventID), - getRequiredHeader(r.Header, HeaderEventType, &ctx.EventType), - getRequiredHeader(r.Header, HeaderSource, &ctx.Source), - getRequiredHeader(r.Header, HeaderContentType, &ctx.ContentType)) - if err != nil { - return nil, err - } +// BinarySender implements an interface for sending an EventContext as +// (possibly one of several versions) as a binary encoding HTTP request. +type BinarySender interface { + // AsHeaders converts this EventContext to a set of HTTP headers. + AsHeaders() (http.Header, error) +} - ctx.CloudEventsVersion = r.Header.Get(HeaderCloudEventsVersion) - if timeStr := r.Header.Get(HeaderEventTime); timeStr != "" { - if ctx.EventTime, err = time.Parse(time.RFC3339Nano, timeStr); err != nil { - return nil, err - } - } - ctx.EventTypeVersion = r.Header.Get(HeaderEventTypeVersion) - ctx.SchemaURL = r.Header.Get(HeaderSchemaURL) - if ctx.CloudEventsVersion != CloudEventsVersion { - log.Printf("Received CloudEvent version %q; parsing as version %q", - ctx.CloudEventsVersion, CloudEventsVersion) - } +// BinaryLoader implements an interface for translating a binary encoding HTTP +// request or response to a an EventContext (possibly one of several versions). +type BinaryLoader interface { + // FromHeaders copies data from the supplied HTTP headers into the object. + // Values will be defaulted if necessary. + FromHeaders(in http.Header) error +} - ctx.Extensions = make(map[string]interface{}) - for k, v := range r.Header { - if strings.ToUpper(k)[:len(HeaderExtensionsPrefix)] != HeaderExtensionsPrefix { - continue - } - name := k[len(HeaderExtensionsPrefix):] - var val interface{} - if err := json.Unmarshal([]byte(v[0]), &val); err != nil { - // If this is not a JSON object, treat it as a string. - // It's not clear when we would treat this as Bytes. - ctx.Extensions[name] = v[0] - } else { - ctx.Extensions[name] = val - } +// FromRequest parses event data and context from an HTTP request. +func (binary) FromRequest(data interface{}, r *http.Request) (LoadContext, error) { + var ec LoadContext + switch { + case r.Header.Get("CE-SpecVersion") == V02CloudEventsVersion: + ec = &V02EventContext{} + case r.Header.Get("CE-CloudEventsVersion") == V01CloudEventsVersion: + ec = &V01EventContext{} + default: + return nil, fmt.Errorf("Could not determine Cloud Events version from header: %+v", r.Header) + } + + if err := ec.FromHeaders(r.Header); err != nil { + return nil, err } - if err := unmarshalEventData(ctx.ContentType, r.Body, data); err != nil { + if err := unmarshalEventData(ec.DataContentType(), r.Body, data); err != nil { return nil, err } - return &ctx, nil + return ec, nil } // NewRequest creates an HTTP request for Binary content encoding. -func (t binary) NewRequest(urlString string, data interface{}, context EventContext) (*http.Request, error) { +func (t binary) NewRequest(urlString string, data interface{}, context SendContext) (*http.Request, error) { url, err := url.Parse(urlString) if err != nil { return nil, err } - h := http.Header{} - err = t.ToHeaders(&context, h) + h, err := context.AsHeaders() if err != nil { return nil, err } - b, err := marshalEventData(context.ContentType, data) + b, err := marshalEventData(h.Get("Content-Type"), data) if err != nil { return nil, err } @@ -138,64 +123,3 @@ func (t binary) NewRequest(urlString string, data interface{}, context EventCont Body: ioutil.NopCloser(bytes.NewReader(b)), }, nil } - -func (binary) ToHeaders(context *EventContext, h http.Header) error { - if context == nil || h == nil { - return nil - } - if err := ensureRequiredFields(*context); err != nil { - return err - } - // Defaultable values: - ceVersion := context.CloudEventsVersion - if ceVersion == "" { - ceVersion = CloudEventsVersion - } - contentType := context.ContentType - if contentType == "" { - contentType = contentTypeJSON - } - - // non-string values: - eventTime := "" - if !context.EventTime.IsZero() { - eventTime = context.EventTime.Format(time.RFC3339Nano) - } - - setHeader(h, HeaderCloudEventsVersion, ceVersion) - setHeader(h, HeaderEventID, context.EventID) - setHeader(h, HeaderEventTime, eventTime) - setHeader(h, HeaderEventType, context.EventType) - setHeader(h, HeaderEventTypeVersion, context.EventTypeVersion) - setHeader(h, HeaderSchemaURL, context.SchemaURL) - setHeader(h, HeaderContentType, contentType) - setHeader(h, HeaderSource, context.Source) - for name, value := range context.Extensions { - encoded, err := json.Marshal(value) - if err != nil { - return err - } - h[HeaderExtensionsPrefix+name] = []string{ - string(encoded), - } - } - - return nil -} - -// TODO(inlined) URI encoding/decoding of headers -func getHeader(h http.Header, name string) string { - return h.Get(name) -} - -func setHeader(h http.Header, name string, value string) { - if value != "" { - h.Set(name, value) - } -} -func getRequiredHeader(h http.Header, name string, value *string) error { - if *value = getHeader(h, name); *value == "" { - return fmt.Errorf("missing required header %q", name) - } - return nil -} diff --git a/vendor/github.com/knative/pkg/cloudevents/encoding_structured.go b/vendor/github.com/knative/pkg/cloudevents/encoding_structured.go index 08c2acab97f..8670241d38f 100644 --- a/vendor/github.com/knative/pkg/cloudevents/encoding_structured.go +++ b/vendor/github.com/knative/pkg/cloudevents/encoding_structured.go @@ -34,73 +34,100 @@ const ( type structured int -type structuredEnvelope struct { - EventContext - RawData json.RawMessage `json:"data"` +// StructuredSender implements an interface for translating an EventContext +// (possibly one of severals versions) to a structured encoding HTTP request. +type StructuredSender interface { + // AsJSON encodes the object into a map from string to JSON data, which + // allows additional keys to be encoded later. + AsJSON() (map[string]json.RawMessage, error) +} + +// StructuredLoader implements an interface for translating a structured +// encoding HTTP request or response to a an EventContext (possibly one of +// several versions). +type StructuredLoader interface { + // FromJSON assumes that the object has already been decoded into a raw map + // from string to json.RawMessage, because this is needed to extract the + // CloudEvents version. + FromJSON(map[string]json.RawMessage) error } // FromRequest parses a CloudEvent from structured content encoding. -func (structured) FromRequest(data interface{}, r *http.Request) (*EventContext, error) { - var e structuredEnvelope - if err := json.NewDecoder(r.Body).Decode(&e); err != nil { +func (structured) FromRequest(data interface{}, r *http.Request) (LoadContext, error) { + raw := make(map[string]json.RawMessage) + if err := json.NewDecoder(r.Body).Decode(&raw); err != nil { return nil, err } - contentType := e.EventContext.ContentType + rawData := raw["data"] + delete(raw, "data") + + var ec LoadContext + v := "" + if err := json.Unmarshal(raw["specversion"], &v); err == nil && v == V02CloudEventsVersion { + ec = &V02EventContext{} + } else if err := json.Unmarshal(raw["cloudEventsVersion"], &v); err == nil && v == V01CloudEventsVersion { + ec = &V01EventContext{} + } else { + return nil, fmt.Errorf("Could not determine Cloud Events version from payload: %q", data) + } + + if err := ec.FromJSON(raw); err != nil { + return nil, err + } + + contentType := ec.DataContentType() if contentType == "" { contentType = contentTypeJSON } var reader io.Reader if !isJSONEncoding(contentType) { var jsonDecoded string - if err := json.Unmarshal(e.RawData, &jsonDecoded); err != nil { - return nil, fmt.Errorf("Could not JSON decode %q value %q", contentType, string(e.RawData)) + if err := json.Unmarshal(rawData, &jsonDecoded); err != nil { + return nil, fmt.Errorf("Could not JSON decode %q value %q", contentType, rawData) } reader = strings.NewReader(jsonDecoded) } else { - reader = bytes.NewReader(e.RawData) - } - if e.EventContext.Extensions == nil { - e.EventContext.Extensions = make(map[string]interface{}, 0) + reader = bytes.NewReader(rawData) } if err := unmarshalEventData(contentType, reader, data); err != nil { return nil, err } - return &e.EventContext, nil + return ec, nil } // NewRequest creates an HTTP request for Structured content encoding. -func (structured) NewRequest(urlString string, data interface{}, context EventContext) (*http.Request, error) { +func (structured) NewRequest(urlString string, data interface{}, context SendContext) (*http.Request, error) { url, err := url.Parse(urlString) if err != nil { return nil, err } - if err := ensureRequiredFields(context); err != nil { + fields, err := context.AsJSON() + if err != nil { return nil, err } - contentType := context.ContentType + // TODO: remove this defaulting? + contentType := context.DataContentType() if contentType == "" { contentType = contentTypeJSON } - e := structuredEnvelope{ - EventContext: context, - } + dataBytes, err := marshalEventData(contentType, data) if err != nil { return nil, err } if isJSONEncoding(contentType) { - e.RawData = json.RawMessage(dataBytes) + fields["data"] = json.RawMessage(dataBytes) } else { - e.RawData, err = json.Marshal(string(dataBytes)) + fields["data"], err = json.Marshal(string(dataBytes)) if err != nil { return nil, err } } - b, err := json.Marshal(e) + b, err := json.Marshal(fields) if err != nil { return nil, err } diff --git a/vendor/github.com/knative/pkg/cloudevents/event.go b/vendor/github.com/knative/pkg/cloudevents/event.go index 70f66e4aa0f..478c1cfe3f3 100644 --- a/vendor/github.com/knative/pkg/cloudevents/event.go +++ b/vendor/github.com/knative/pkg/cloudevents/event.go @@ -24,14 +24,9 @@ import ( "io" "net/http" "reflect" - "time" ) const ( - // CloudEventsVersion is the version of the CloudEvents spec targeted - // by this library. - CloudEventsVersion = "0.1" - // ContentTypeStructuredJSON is the content-type for "Structured" encoding // where an event envelope is written in JSON and the body is arbitrary // data which might be an alternate encoding. @@ -48,43 +43,68 @@ const ( // HeaderContentType is the standard HTTP header "Content-Type" HeaderContentType = "Content-Type" - // required attributes - fieldCloudEventsVersion = "CloudEventsVersion" - fieldEventID = "EventID" - fieldEventType = "EventType" - fieldSource = "Source" + // CloudEventsVersion is a legacy alias of V01CloudEventsVersion, for compatibility. + CloudEventsVersion = V01CloudEventsVersion ) -// EventContext holds standard metadata about an event. See -// https://github.com/cloudevents/spec/blob/v0.1/spec.md#context-attributes for -// details on these fields. -type EventContext struct { - // The version of the CloudEvents specification used by the event. - CloudEventsVersion string `json:"cloudEventsVersion,omitempty"` - // ID of the event; must be non-empty and unique within the scope of the producer. - EventID string `json:"eventID"` - // Timestamp when the event happened. - EventTime time.Time `json:"eventTime,omitempty"` - // Type of occurrence which has happened. - EventType string `json:"eventType"` - // The version of the `eventType`; this is producer-specific. - EventTypeVersion string `json:"eventTypeVersion,omitempty"` - // A link to the schema that the `data` attribute adheres to. - SchemaURL string `json:"schemaURL,omitempty"` - // A MIME (RFC 2046) string describing the media type of `data`. - // TODO: Should an empty string assume `application/json`, or auto-detect the content? - ContentType string `json:"contentType,omitempty"` - // A URI describing the event producer. - Source string `json:"source"` - // Additional metadata without a well-defined structure. - Extensions map[string]interface{} `json:"extensions,omitempty"` -} +// EventContext is a legacy un-versioned alias, from when we thought that field names would stay the same. +type EventContext = V01EventContext // HTTPMarshaller implements a scheme for decoding CloudEvents over HTTP. // Implementations are Binary, Structured, and Any type HTTPMarshaller interface { - FromRequest(data interface{}, r *http.Request) (*EventContext, error) - NewRequest(urlString string, data interface{}, context EventContext) (*http.Request, error) + FromRequest(data interface{}, r *http.Request) (LoadContext, error) + NewRequest(urlString string, data interface{}, context SendContext) (*http.Request, error) +} + +// ContextTranslator provides a set of translation methods between the +// different versions of the CloudEvents spec, which allows programs to +// interoperate with different versions of the CloudEvents spec by +// converting EventContexts to their preferred version. +type ContextTranslator interface { + // AsV01 provides a translation from whatever the "native" encoding of the + // CloudEvent was to the equivalent in v0.1 field names, moving fields to or + // from extensions as necessary. + AsV01() V01EventContext + + // AsV02 provides a translation from whatever the "native" encoding of the + // CloudEvent was to the equivalent in v0.2 field names, moving fields to or + // from extensions as necessary. + AsV02() V02EventContext + + // DataContentType returns the MIME content type for encoding data, which is + // needed by both encoding and decoding. + DataContentType() string +} + +// SendContext provides an interface for extracting information from an +// EventContext (the set of non-data event attributes of a CloudEvent). +type SendContext interface { + ContextTranslator + + StructuredSender + BinarySender +} + +// LoadContext provides an interface for extracting information from an +// EventContext (the set of non-data event attributes of a CloudEvent). +type LoadContext interface { + ContextTranslator + + StructuredLoader + BinaryLoader +} + +// ContextType is a unified interface for both sending and loading the +// CloudEvent data across versions. +type ContextType interface { + ContextTranslator + + StructuredSender + BinarySender + + StructuredLoader + BinaryLoader } func anyError(errs ...error) error { @@ -96,13 +116,6 @@ func anyError(errs ...error) error { return nil } -func ensureRequiredFields(context EventContext) error { - return anyError( - require(fieldEventID, context.EventID), - require(fieldEventType, context.EventType), - require(fieldSource, context.Source)) -} - func require(name string, value string) error { if len(value) == 0 { return fmt.Errorf("missing required field %q", name) @@ -169,7 +182,7 @@ func marshalEventData(encoding string, data interface{}) ([]byte, error) { } // FromRequest parses a CloudEvent from any known encoding. -func FromRequest(data interface{}, r *http.Request) (*EventContext, error) { +func FromRequest(data interface{}, r *http.Request) (LoadContext, error) { switch r.Header.Get(HeaderContentType) { case ContentTypeStructuredJSON: return Structured.FromRequest(data, r) @@ -184,16 +197,16 @@ func FromRequest(data interface{}, r *http.Request) (*EventContext, error) { } // NewRequest craetes an HTTP request for Structured content encoding. -func NewRequest(urlString string, data interface{}, context EventContext) (*http.Request, error) { +func NewRequest(urlString string, data interface{}, context SendContext) (*http.Request, error) { return Structured.NewRequest(urlString, data, context) } -// Opaque key type used to store EventContexts in a context.Context +// Opaque key type used to store V01EventContexts in a context.Context type contextKeyType struct{} var contextKey = contextKeyType{} -// FromContext loads an EventContext from a normal context.Context -func FromContext(ctx context.Context) *EventContext { - return ctx.Value(contextKey).(*EventContext) +// FromContext loads an V01EventContext from a normal context.Context +func FromContext(ctx context.Context) LoadContext { + return ctx.Value(contextKey).(LoadContext) } diff --git a/vendor/github.com/knative/pkg/cloudevents/event_v01.go b/vendor/github.com/knative/pkg/cloudevents/event_v01.go new file mode 100644 index 00000000000..2ba67ca0c0d --- /dev/null +++ b/vendor/github.com/knative/pkg/cloudevents/event_v01.go @@ -0,0 +1,236 @@ +/* +Copyright 2018 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudevents + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + "time" +) + +const ( + // V01CloudEventsVersion is the version of the CloudEvents spec targeted + // by this library. + V01CloudEventsVersion = "0.1" + + // v0.1 field names + fieldCloudEventsVersion = "CloudEventsVersion" + fieldEventID = "EventID" + fieldEventType = "EventType" +) + +// V01EventContext holds standard metadata about an event. See +// https://github.com/cloudevents/spec/blob/v0.1/spec.md#context-attributes for +// details on these fields. +type V01EventContext struct { + // The version of the CloudEvents specification used by the event. + CloudEventsVersion string `json:"cloudEventsVersion,omitempty"` + // ID of the event; must be non-empty and unique within the scope of the producer. + EventID string `json:"eventID"` + // Timestamp when the event happened. + EventTime time.Time `json:"eventTime,omitempty"` + // Type of occurrence which has happened. + EventType string `json:"eventType"` + // The version of the `eventType`; this is producer-specific. + EventTypeVersion string `json:"eventTypeVersion,omitempty"` + // A link to the schema that the `data` attribute adheres to. + SchemaURL string `json:"schemaURL,omitempty"` + // A MIME (RFC 2046) string describing the media type of `data`. + // TODO: Should an empty string assume `application/json`, or auto-detect the content? + ContentType string `json:"contentType,omitempty"` + // A URI describing the event producer. + Source string `json:"source"` + // Additional metadata without a well-defined structure. + Extensions map[string]interface{} `json:"extensions,omitempty"` +} + +// AsV01 implements the ContextTranslator interface. +func (ec V01EventContext) AsV01() V01EventContext { + return ec +} + +// AsV02 implements the ContextTranslator interface. +func (ec V01EventContext) AsV02() V02EventContext { + ret := V02EventContext{ + SpecVersion: V02CloudEventsVersion, + Type: ec.EventType, + Source: ec.Source, + ID: ec.EventID, + Time: ec.EventTime, + SchemaURL: ec.SchemaURL, + ContentType: ec.ContentType, + Extensions: make(map[string]interface{}), + } + // eventTypeVersion was retired in v0.2, so put it in an extension. + if ec.EventTypeVersion != "" { + ret.Extensions["eventtypeversion"] = ec.EventTypeVersion + } + for k, v := range ec.Extensions { + ret.Extensions[k] = v + } + return ret +} + +// AsHeaders implements the BinarySender interface. +func (ec V01EventContext) AsHeaders() (http.Header, error) { + h := http.Header{} + h.Set("CE-CloudEventsVersion", ec.CloudEventsVersion) + h.Set("CE-EventID", ec.EventID) + h.Set("CE-EventType", ec.EventType) + h.Set("CE-Source", ec.Source) + if ec.CloudEventsVersion == "" { + h.Set("CE-CloudEventsVersion", V01CloudEventsVersion) + } + if !ec.EventTime.IsZero() { + h.Set("CE-EventTime", ec.EventTime.Format(time.RFC3339Nano)) + } + if ec.EventTypeVersion != "" { + h.Set("CE-EventTypeVersion", ec.EventTypeVersion) + } + if ec.SchemaURL != "" { + h.Set("CE-SchemaUrl", ec.SchemaURL) + } + if ec.ContentType != "" { + h.Set("Content-Type", ec.ContentType) + } + for k, v := range ec.Extensions { + encoded, err := json.Marshal(v) + if err != nil { + return nil, err + } + // Preserve case in v0.1, even though HTTP headers are case-insensitive. + h["CE-X-"+k] = []string{string(encoded)} + } + return h, nil +} + +// FromHeaders implements the BinaryLoader interface. +func (ec *V01EventContext) FromHeaders(in http.Header) error { + missingField := func(name string) error { + if in.Get("CE-"+name) == "" { + return fmt.Errorf("Missing field %q in %v: %q", "CE-"+name, in, in.Get("CE-"+name)) + } + return nil + } + if err := anyError( + missingField("CloudEventsVersion"), + missingField("EventID"), + missingField("EventType"), + missingField("Source")); err != nil { + return err + } + data := V01EventContext{ + CloudEventsVersion: in.Get("CE-CloudEventsVersion"), + EventID: in.Get("CE-EventID"), + EventType: in.Get("CE-EventType"), + EventTypeVersion: in.Get("CE-EventTypeVersion"), + SchemaURL: in.Get("CE-SchemaURL"), + ContentType: in.Get("Content-Type"), + Source: in.Get("CE-Source"), + Extensions: make(map[string]interface{}), + } + if timeStr := in.Get("CE-EventTime"); timeStr != "" { + var err error + if data.EventTime, err = time.Parse(time.RFC3339Nano, timeStr); err != nil { + return err + } + } + for k, v := range in { + if strings.EqualFold(k[:len("CE-X-")], "CE-X-") { + key := k[len("CE-X-"):] + var tmp interface{} + if err := json.Unmarshal([]byte(v[0]), &tmp); err == nil { + data.Extensions[key] = tmp + } else { + // If we can't unmarshal the data, treat it as a string. + data.Extensions[key] = v[0] + } + } + } + *ec = data + return nil +} + +// AsJSON implements the StructuredSender interface. +func (ec V01EventContext) AsJSON() (map[string]json.RawMessage, error) { + ret := make(map[string]json.RawMessage) + err := anyError( + encodeKey(ret, "cloudEventsVersion", ec.CloudEventsVersion), + encodeKey(ret, "eventID", ec.EventID), + encodeKey(ret, "eventTime", ec.EventTime), + encodeKey(ret, "eventType", ec.EventType), + encodeKey(ret, "eventTypeVersion", ec.EventTypeVersion), + encodeKey(ret, "schemaURL", ec.SchemaURL), + encodeKey(ret, "contentType", ec.ContentType), + encodeKey(ret, "source", ec.Source), + encodeKey(ret, "extensions", ec.Extensions)) + return ret, err +} + +// DataContentType implements the StructuredSender interface. +func (ec V01EventContext) DataContentType() string { + return ec.ContentType +} + +// FromJSON implements the StructuredLoader interface. +func (ec *V01EventContext) FromJSON(in map[string]json.RawMessage) error { + data := V01EventContext{ + CloudEventsVersion: extractKey(in, "cloudEventsVersion"), + EventID: extractKey(in, "eventID"), + EventType: extractKey(in, "eventType"), + Source: extractKey(in, "source"), + } + var err error + if timeStr := extractKey(in, "eventTime"); timeStr != "" { + if data.EventTime, err = time.Parse(time.RFC3339Nano, timeStr); err != nil { + return err + } + } + extractKeyTo(in, "eventTypeVersion", &data.EventTypeVersion) + extractKeyTo(in, "schemaURL", &data.SchemaURL) + extractKeyTo(in, "contentType", &data.ContentType) + if len(in["extensions"]) == 0 { + in["extensions"] = []byte("{}") + } + if err = json.Unmarshal(in["extensions"], &data.Extensions); err != nil { + return err + } + *ec = data + return nil +} + +func encodeKey(out map[string]json.RawMessage, key string, value interface{}) (err error) { + if s, ok := value.(string); ok && s == "" { + // Skip empty strings. + return nil + } + out[key], err = json.Marshal(value) + return +} + +func extractKey(in map[string]json.RawMessage, key string) (s string) { + extractKeyTo(in, key, &s) + return +} + +func extractKeyTo(in map[string]json.RawMessage, key string, out *string) error { + tmp := in[key] + delete(in, key) + return json.Unmarshal(tmp, out) +} diff --git a/vendor/github.com/knative/pkg/cloudevents/event_v02.go b/vendor/github.com/knative/pkg/cloudevents/event_v02.go new file mode 100644 index 00000000000..dc64767cc59 --- /dev/null +++ b/vendor/github.com/knative/pkg/cloudevents/event_v02.go @@ -0,0 +1,261 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudevents + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" + "time" +) + +const ( + // V02CloudEventsVersion is the version of the CloudEvents spec targeted + // by this library. + V02CloudEventsVersion = "0.2" + + // required attributes + fieldSpecVersion = "specversion" + fieldID = "id" + fieldType = "type" + fieldSource = "source" + fieldTime = "time" + fieldSchemaURL = "schemaurl" + fieldContentType = "contenttype" + headerContentType = "Content-Type" +) + +// V02EventContext represents the non-data attributes of a CloudEvents v0.2 +// event. +type V02EventContext struct { + // The version of the CloudEvents specification used by the event. + SpecVersion string `json:"specversion"` + // The type of the occurrence which has happened. + Type string `json:"type"` + // A URI describing the event producer. + Source string `json:"source"` + // ID of the event; must be non-empty and unique within the scope of the producer. + ID string `json:"id"` + // Timestamp when the event happened. + Time time.Time `json:"time,omitempty"` + // A link to the schema that the `data` attribute adheres to. + SchemaURL string `json:"schemaurl,omitempty"` + // A MIME (RFC2046) string describing the media type of `data`. + // TODO: Should an empty string assume `application/json`, `application/octet-stream`, or auto-detect the content? + ContentType string `json:"contenttype,omitempty"` + // Additional extension metadata beyond the base spec. + Extensions map[string]interface{} `json:"-,omitempty"` +} + +// AsV01 implements the ContextTranslator interface. +func (ec V02EventContext) AsV01() V01EventContext { + ret := V01EventContext{ + CloudEventsVersion: V01CloudEventsVersion, + EventID: ec.ID, + EventTime: ec.Time, + EventType: ec.Type, + SchemaURL: ec.SchemaURL, + ContentType: ec.ContentType, + Source: ec.Source, + Extensions: make(map[string]interface{}), + } + for k, v := range ec.Extensions { + // eventTypeVersion was retired in v0.2 + if strings.EqualFold(k, "eventTypeVersion") { + etv, ok := v.(string) + if ok { + ret.EventTypeVersion = etv + } + continue + } + ret.Extensions[k] = v + } + return ret +} + +// AsV02 implements the ContextTranslator interface. +func (ec V02EventContext) AsV02() V02EventContext { + return ec +} + +// AsHeaders implements the BinarySender interface. +func (ec V02EventContext) AsHeaders() (http.Header, error) { + h := http.Header{} + h.Set("CE-"+fieldSpecVersion, ec.SpecVersion) + h.Set("CE-"+fieldType, ec.Type) + h.Set("CE-"+fieldSource, ec.Source) + h.Set("CE-"+fieldID, ec.ID) + if ec.SpecVersion == "" { + h.Set("CE-"+fieldSpecVersion, V02CloudEventsVersion) + } + if !ec.Time.IsZero() { + h.Set("CE-"+fieldTime, ec.Time.Format(time.RFC3339Nano)) + } + if ec.SchemaURL != "" { + h.Set("CE-"+fieldSchemaURL, ec.SchemaURL) + } + if ec.ContentType != "" { + h.Set(headerContentType, ec.ContentType) + } + for k, v := range ec.Extensions { + // Per spec, map-valued extensions are converted to a list of headers as: + // CE-attrib-key + if mapVal, ok := v.(map[string]interface{}); ok { + for subkey, subval := range mapVal { + encoded, err := json.Marshal(subval) + if err != nil { + return nil, err + } + h.Set("CE-"+k+"-"+subkey, string(encoded)) + } + continue + } + encoded, err := json.Marshal(v) + if err != nil { + return nil, err + } + h.Set("CE-"+k, string(encoded)) + } + + return h, nil +} + +// FromHeaders implements the BinaryLoader interface. +func (ec *V02EventContext) FromHeaders(in http.Header) error { + missingField := func(name string) error { + if in.Get("CE-"+name) == "" { + return fmt.Errorf("Missing field %q in %v: %q", "CE-"+name, in, in.Get("CE-"+name)) + } + return nil + } + err := anyError( + missingField(fieldSpecVersion), + missingField(fieldID), + missingField(fieldType), + missingField(fieldSource), + ) + if err != nil { + return err + } + data := V02EventContext{ + ContentType: in.Get(headerContentType), + Extensions: make(map[string]interface{}), + } + // Extensions and top-level fields are mixed under "CE-" headers. + // Extract them all here rather than trying to clear fields in headers. + for k, v := range in { + if strings.EqualFold(k[:len("CE-")], "CE-") { + key, value := strings.ToLower(string(k[len("CE-"):])), v[0] + switch key { + case fieldSpecVersion: + data.SpecVersion = value + case fieldType: + data.Type = value + case fieldSource: + data.Source = value + case fieldID: + data.ID = value + case fieldSchemaURL: + data.SchemaURL = value + case fieldTime: + if data.Time, err = time.Parse(time.RFC3339Nano, value); err != nil { + return err + } + default: + var tmp interface{} + if err = json.Unmarshal([]byte(value), &tmp); err != nil { + tmp = value + } + // Per spec, map-valued extensions are converted to a list of headers as: + // CE-attrib-key. This is where things get a bit crazy... see + // https://github.com/cloudevents/spec/issues/367 for additional notes. + if strings.Contains(key, "-") { + items := strings.SplitN(key, "-", 2) + key, subkey := items[0], items[1] + if _, ok := data.Extensions[key]; !ok { + data.Extensions[key] = make(map[string]interface{}) + } + if submap, ok := data.Extensions[key].(map[string]interface{}); ok { + submap[subkey] = tmp + } + } else { + data.Extensions[key] = tmp + } + } + } + } + *ec = data + return nil +} + +// AsJSON implementsn the StructuredSender interface. +func (ec V02EventContext) AsJSON() (map[string]json.RawMessage, error) { + ret := make(map[string]json.RawMessage) + err := anyError( + encodeKey(ret, fieldSpecVersion, ec.SpecVersion), + encodeKey(ret, fieldType, ec.Type), + encodeKey(ret, fieldSource, ec.Source), + encodeKey(ret, fieldID, ec.ID), + encodeKey(ret, fieldTime, ec.Time), + encodeKey(ret, fieldSchemaURL, ec.SchemaURL), + encodeKey(ret, fieldContentType, ec.ContentType), + ) + if err != nil { + return nil, err + } + for k, v := range ec.Extensions { + if err = encodeKey(ret, k, v); err != nil { + return nil, err + } + } + return ret, nil +} + +// DataContentType implements the StructuredSender interface. +func (ec V02EventContext) DataContentType() string { + return ec.ContentType +} + +// FromJSON implements the StructuredLoader interface. +func (ec *V02EventContext) FromJSON(in map[string]json.RawMessage) error { + data := V02EventContext{ + SpecVersion: extractKey(in, fieldSpecVersion), + Type: extractKey(in, fieldType), + Source: extractKey(in, fieldSource), + ID: extractKey(in, fieldID), + Extensions: make(map[string]interface{}), + } + var err error + if timeStr := extractKey(in, fieldTime); timeStr != "" { + if data.Time, err = time.Parse(time.RFC3339Nano, timeStr); err != nil { + return err + } + } + extractKeyTo(in, fieldSchemaURL, &data.SchemaURL) + extractKeyTo(in, fieldContentType, &data.ContentType) + // Extract the remaining items from in by converting to JSON and then + // unpacking into Extensions. This avoids having to do funny type + // checking/testing in the loop over values. + extensionsJSON, err := json.Marshal(in) + if err != nil { + return err + } + err = json.Unmarshal(extensionsJSON, &data.Extensions) + *ec = data + return err +} diff --git a/vendor/github.com/knative/pkg/cloudevents/handler.go b/vendor/github.com/knative/pkg/cloudevents/handler.go index 628d073f246..3b8575a1cfc 100644 --- a/vendor/github.com/knative/pkg/cloudevents/handler.go +++ b/vendor/github.com/knative/pkg/cloudevents/handler.go @@ -19,6 +19,7 @@ package cloudevents import ( "context" "encoding/json" + "errors" "fmt" "io" "log" @@ -47,7 +48,7 @@ type errAndHandler interface { const ( inParamUsage = "Expected a function taking either no parameters, a context.Context, or (context.Context, any)" - outParamUsage = "Expected a function returning either nothing, an error, (any, error), or (any, EventContext, error)" + outParamUsage = "Expected a function returning either nothing, an error, (any, error), or (any, SendContext, error)" ) var ( @@ -59,9 +60,9 @@ var ( // it leaves this stack frame. The workaround is to pass a pointer to an interface and then // get the type of its reference. // For example, see: https://play.golang.org/p/_dxLvdkvqvg - contextType = reflect.TypeOf((*context.Context)(nil)).Elem() - errorType = reflect.TypeOf((*error)(nil)).Elem() - eventContextType = reflect.TypeOf((*EventContext)(nil)).Elem() + contextType = reflect.TypeOf((*context.Context)(nil)).Elem() + errorType = reflect.TypeOf((*error)(nil)).Elem() + sendContextType = reflect.TypeOf((*SendContext)(nil)).Elem() ) // Verifies that the inputs to a function have a valid signature; panics otherwise. @@ -90,8 +91,8 @@ func validateOutParamSignature(fnType reflect.Type) error { switch fnType.NumOut() { case 3: contextType := fnType.Out(1) - if !contextType.ConvertibleTo(eventContextType) { - return fmt.Errorf("%s; cannot convert return type 1 from %s to EventContext", outParamUsage, contextType) + if !contextType.ConvertibleTo(sendContextType) { + return fmt.Errorf("%s; cannot convert return type 1 from %s to SendContext", outParamUsage, contextType) } fallthrough case 2: @@ -114,7 +115,7 @@ func validateOutParamSignature(fnType reflect.Type) error { // of allowed types. If successful, returns the expected in-param type, otherwise panics. func validateFunction(fnType reflect.Type) errAndHandler { if fnType.Kind() != reflect.Func { - return &failedHandler{err: fmt.Errorf("Must pass a function to handle events")} + return &failedHandler{err: errors.New("must pass a function to handle events")} } err := anyError( validateInParamSignature(fnType), @@ -144,7 +145,7 @@ func allocate(t reflect.Type) (asPtr interface{}, asValue reflect.Value) { return } -func unwrapReturnValues(res []reflect.Value) (interface{}, *EventContext, error) { +func unwrapReturnValues(res []reflect.Value) (interface{}, SendContext, error) { switch len(res) { case 0: return nil, nil, nil @@ -162,8 +163,8 @@ func unwrapReturnValues(res []reflect.Value) (interface{}, *EventContext, error) return nil, nil, res[1].Interface().(error) case 3: if res[2].IsNil() { - ec := res[1].Interface().(EventContext) - return res[0].Interface(), &ec, nil + ec := res[1].Interface().(SendContext) + return res[0].Interface(), ec, nil } return nil, nil, res[2].Interface().(error) default: @@ -191,7 +192,7 @@ func respondHTTP(outparams []reflect.Value, fn reflect.Value, w http.ResponseWri if eventType == "" { eventType = "dev.knative.pkg.cloudevents.unknown" } - ec = &EventContext{ + ec = &V01EventContext{ EventID: uuid.New().String(), EventType: eventType, Source: "unknown", // TODO: anything useful here, maybe incoming Host header? @@ -206,13 +207,17 @@ func respondHTTP(outparams []reflect.Value, fn reflect.Value, w http.ResponseWri w.Write([]byte(`Internal server error`)) return } - err = Binary.ToHeaders(ec, w.Header()) + headers, err := ec.AsHeaders() if err != nil { - log.Printf("Failed to marshal headers for response: %+v: %s", ec, err) + log.Printf("Failed to marshal event context %+v: %s", res, err) w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(`Internal server error`)) + w.Write([]byte("Internal server error")) return } + for k, v := range headers { + w.Header()[k] = v + } + w.Write(json) return } @@ -364,11 +369,13 @@ func (m Mux) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } - h := m[eventContext.EventType] + c := eventContext.AsV01() + + h := m[c.EventType] if h == nil { - log.Print("Cloud not find handler for event type", eventContext.EventType) + log.Print("Cloud not find handler for event type", c.EventType) w.WriteHeader(http.StatusInternalServerError) - w.Write([]byte(fmt.Sprintf("Event type %q is not supported", eventContext.EventType))) + w.Write([]byte(fmt.Sprintf("Event type %q is not supported", c.EventType))) return } @@ -380,7 +387,7 @@ func (m Mux) ServeHTTP(w http.ResponseWriter, r *http.Request) { } if h.numIn == 2 { dataPtr, dataArg := allocate(h.dataType) - if err := unmarshalEventData(eventContext.ContentType, rawData, dataPtr); err != nil { + if err := unmarshalEventData(c.ContentType, rawData, dataPtr); err != nil { log.Print("Failed to parse event data", err) w.WriteHeader(http.StatusBadRequest) w.Write([]byte(`Invalid request`)) diff --git a/vendor/github.com/knative/pkg/logging/config.go b/vendor/github.com/knative/pkg/logging/config.go index c03c193148b..ad6d616f02a 100644 --- a/vendor/github.com/knative/pkg/logging/config.go +++ b/vendor/github.com/knative/pkg/logging/config.go @@ -105,23 +105,48 @@ type Config struct { LoggingLevel map[string]zapcore.Level } +const defaultZLC = `{ + "level": "info", + "development": false, + "outputPaths": ["stdout"], + "errorOutputPaths": ["stderr"], + "encoding": "json", + "encoderConfig": { + "timeKey": "ts", + "levelKey": "level", + "nameKey": "logger", + "callerKey": "caller", + "messageKey": "msg", + "stacktraceKey": "stacktrace", + "lineEnding": "", + "levelEncoder": "", + "timeEncoder": "iso8601", + "durationEncoder": "", + "callerEncoder": "" + } +}` + // NewConfigFromMap creates a LoggingConfig from the supplied map, // expecting the given list of components. func NewConfigFromMap(data map[string]string, components ...string) (*Config, error) { lc := &Config{} if zlc, ok := data["zap-logger-config"]; ok { lc.LoggingConfig = zlc + } else { + lc.LoggingConfig = defaultZLC } + lc.LoggingLevel = make(map[string]zapcore.Level) for _, component := range components { - if ll, ok := data["loglevel."+component]; ok { - if len(ll) > 0 { - level, err := levelFromString(ll) - if err != nil { - return nil, err - } - lc.LoggingLevel[component] = *level + if ll := data["loglevel."+component]; len(ll) > 0 { + level, err := levelFromString(ll) + if err != nil { + return nil, err } + lc.LoggingLevel[component] = *level + } else { + // We default components to INFO + lc.LoggingLevel[component] = zapcore.InfoLevel } } return lc, nil diff --git a/pkg/system/names.go b/vendor/github.com/knative/pkg/system/clock.go similarity index 71% rename from pkg/system/names.go rename to vendor/github.com/knative/pkg/system/clock.go index d07a7849171..7d99d9b5cdf 100644 --- a/pkg/system/names.go +++ b/vendor/github.com/knative/pkg/system/clock.go @@ -1,5 +1,5 @@ /* -Copyright 2018 The Knative Authors +Copyright 2019 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -16,8 +16,17 @@ limitations under the License. package system -const ( - // Namespace holds the K8s namespace where our eventing system - // components run. - Namespace = "knative-eventing" +import ( + "time" ) + +// Mockable interface for time based testing +type Clock interface { + Now() time.Time +} + +type RealClock struct{} + +func (RealClock) Now() time.Time { + return time.Now() +} diff --git a/vendor/github.com/knative/pkg/system/names.go b/vendor/github.com/knative/pkg/system/names.go new file mode 100644 index 00000000000..fdd6a576eac --- /dev/null +++ b/vendor/github.com/knative/pkg/system/names.go @@ -0,0 +1,52 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package system + +import ( + "fmt" + "os" +) + +const ( + NamespaceEnvKey = "SYSTEM_NAMESPACE" +) + +// Namespace holds the K8s namespace where our serving system +// components run. +func Namespace() string { + if ns := os.Getenv(NamespaceEnvKey); ns != "" { + return ns + } + + panic(fmt.Sprintf(`The environment variable %q is not set + +If this is a process running on Kubernetes, then it should be using the downward +API to initialize this variable via: + + env: + - name: %s + valueFrom: + fieldRef: + fieldPath: metadata.namespace + +If this is a Go unit test consuming system.Namespace() then it should add the +following import: + +import ( + _ "github.com/knative/pkg/system/testing" +)`, NamespaceEnvKey, NamespaceEnvKey)) +} diff --git a/vendor/github.com/knative/pkg/system/testing/names.go b/vendor/github.com/knative/pkg/system/testing/names.go new file mode 100644 index 00000000000..ac4945a969f --- /dev/null +++ b/vendor/github.com/knative/pkg/system/testing/names.go @@ -0,0 +1,27 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "os" + + "github.com/knative/pkg/system" +) + +func init() { + os.Setenv(system.NamespaceEnvKey, "knative-testing") +} diff --git a/vendor/github.com/knative/pkg/test/clients.go b/vendor/github.com/knative/pkg/test/clients.go index c740160d362..f6de7c6383c 100644 --- a/vendor/github.com/knative/pkg/test/clients.go +++ b/vendor/github.com/knative/pkg/test/clients.go @@ -35,7 +35,7 @@ type KubeClient struct { // NewSpoofingClient returns a spoofing client to make requests func NewSpoofingClient(client *KubeClient, logger *logging.BaseLogger, domain string, resolvable bool) (*spoof.SpoofingClient, error) { - return spoof.New(client.Kube, logger, domain, resolvable) + return spoof.New(client.Kube, logger, domain, resolvable, Flags.IngressEndpoint) } // NewKubeClient instantiates and returns several clientsets required for making request to the diff --git a/vendor/github.com/knative/pkg/test/e2e_flags.go b/vendor/github.com/knative/pkg/test/e2e_flags.go index 1cd0fada634..46bc70751f4 100644 --- a/vendor/github.com/knative/pkg/test/e2e_flags.go +++ b/vendor/github.com/knative/pkg/test/e2e_flags.go @@ -32,11 +32,12 @@ var Flags = initializeFlags() // EnvironmentFlags define the flags that are needed to run the e2e tests. type EnvironmentFlags struct { - Cluster string // K8s cluster (defaults to $K8S_CLUSTER_OVERRIDE) - Kubeconfig string // Path to kubeconfig (defaults to ./kube/config) - Namespace string // K8s namespace (blank by default, to be overwritten by test suite) - LogVerbose bool // Enable verbose logging - EmitMetrics bool // Emit metrics + Cluster string // K8s cluster (defaults to $K8S_CLUSTER_OVERRIDE) + Kubeconfig string // Path to kubeconfig (defaults to ./kube/config) + Namespace string // K8s namespace (blank by default, to be overwritten by test suite) + IngressEndpoint string // Host to use for ingress endpoint + LogVerbose bool // Enable verbose logging + EmitMetrics bool // Emit metrics } func initializeFlags() *EnvironmentFlags { @@ -56,6 +57,8 @@ func initializeFlags() *EnvironmentFlags { flag.StringVar(&f.Namespace, "namespace", "", "Provide the namespace you would like to use for these tests.") + flag.StringVar(&f.IngressEndpoint, "ingressendpoint", "", "Provide a static endpoint url to the ingress server used during tests.") + flag.BoolVar(&f.LogVerbose, "logverbose", false, "Set this flag to true if you would like to see verbose logging.") diff --git a/vendor/github.com/knative/pkg/test/kube_checks.go b/vendor/github.com/knative/pkg/test/kube_checks.go index 0cc518ff6d6..9851487df8c 100644 --- a/vendor/github.com/knative/pkg/test/kube_checks.go +++ b/vendor/github.com/knative/pkg/test/kube_checks.go @@ -29,6 +29,7 @@ import ( apiv1beta1 "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" + k8styped "k8s.io/client-go/kubernetes/typed/core/v1" ) const ( @@ -73,3 +74,15 @@ func WaitForPodListState(client *KubeClient, inState func(p *corev1.PodList) (bo return inState(p) }) } + +// GetConfigMap gets the configmaps for a given namespace +func GetConfigMap(client *KubeClient, namespace string) k8styped.ConfigMapInterface { + return client.Kube.CoreV1().ConfigMaps(namespace) +} + +// Returns a func that evaluates if a deployment has scaled to 0 pods +func DeploymentScaledToZeroFunc() func(d *apiv1beta1.Deployment) (bool, error) { + return func(d *apiv1beta1.Deployment) (bool, error) { + return d.Status.ReadyReplicas == 0, nil + } +} diff --git a/vendor/github.com/knative/pkg/test/logging/logging.go b/vendor/github.com/knative/pkg/test/logging/logging.go index b5166b207ec..f6966be31b5 100644 --- a/vendor/github.com/knative/pkg/test/logging/logging.go +++ b/vendor/github.com/knative/pkg/test/logging/logging.go @@ -24,6 +24,7 @@ import ( "fmt" "time" + "github.com/davecgh/go-spew/spew" "github.com/golang/glog" "github.com/knative/pkg/logging" "go.opencensus.io/stats/view" @@ -58,9 +59,9 @@ type BaseLogger struct { // ExportView will emit the view data vd (i.e. the stats that have been // recorded) to the zap logger. func (e *zapMetricExporter) ExportView(vd *view.Data) { - // We are not curretnly consuming these metrics, so for now we'll juse + // We are not currently consuming these metrics, so for now we'll juse // dump the view.Data object as is. - e.logger.Info(vd) + e.logger.Debug(spew.Sprint(vd)) } // ExportSpan will emit the trace data to the zap logger. diff --git a/vendor/github.com/knative/pkg/test/spoof/spoof.go b/vendor/github.com/knative/pkg/test/spoof/spoof.go index ded84b0bd3a..90124712889 100644 --- a/vendor/github.com/knative/pkg/test/spoof/spoof.go +++ b/vendor/github.com/knative/pkg/test/spoof/spoof.go @@ -26,10 +26,11 @@ import ( "io/ioutil" "net" "net/http" + "os" "time" "github.com/knative/pkg/test/logging" - zipkin "github.com/knative/pkg/test/zipkin" + "github.com/knative/pkg/test/zipkin" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -44,14 +45,10 @@ const ( requestInterval = 1 * time.Second requestTimeout = 5 * time.Minute // TODO(tcnghia): These probably shouldn't be hard-coded here? - ingressNamespace = "istio-system" + istioIngressNamespace = "istio-system" + istioIngressName = "istio-ingressgateway" ) -// Temporary work around the upgrade test issue for knative/serving#2434. -// TODO(lichuqiang): remove the backward compatibility for knative-ingressgateway -// once knative/serving#2434 is merged -var ingressNames = []string{"knative-ingressgateway", "istio-ingressgateway"} - // Response is a stripped down subset of http.Response. The is primarily useful // for ResponseCheckers to inspect the response body without consuming it. // Notably, Body is a byte slice instead of an io.ReadCloser. @@ -96,7 +93,7 @@ type SpoofingClient struct { // follow the ingress if it moves (or if there are multiple ingresses). // // If that's a problem, see test/request.go#WaitForEndpointState for oneshot spoofing. -func New(kubeClientset *kubernetes.Clientset, logger *logging.BaseLogger, domain string, resolvable bool) (*SpoofingClient, error) { +func New(kubeClientset *kubernetes.Clientset, logger *logging.BaseLogger, domain string, resolvable bool, endpointOverride string) (*SpoofingClient, error) { sc := SpoofingClient{ Client: &http.Client{Transport: &ochttp.Transport{Propagation: &b3.HTTPFormat{}}}, // Using ochttp Transport required for zipkin-tracing RequestInterval: requestInterval, @@ -105,12 +102,16 @@ func New(kubeClientset *kubernetes.Clientset, logger *logging.BaseLogger, domain } if !resolvable { - // If the domain that the Route controller is configured to assign to Route.Status.Domain - // (the domainSuffix) is not resolvable, we need to retrieve the IP of the endpoint and - // spoof the Host in our requests. - e, err := GetServiceEndpoint(kubeClientset) - if err != nil { - return nil, err + e := &endpointOverride + if endpointOverride == "" { + var err error + // If the domain that the Route controller is configured to assign to Route.Status.Domain + // (the domainSuffix) is not resolvable, we need to retrieve the IP of the endpoint and + // spoof the Host in our requests. + e, err = GetServiceEndpoint(kubeClientset) + if err != nil { + return nil, err + } } sc.endpoint = *e @@ -125,24 +126,24 @@ func New(kubeClientset *kubernetes.Clientset, logger *logging.BaseLogger, domain // GetServiceEndpoint gets the endpoint IP or hostname to use for the service. func GetServiceEndpoint(kubeClientset *kubernetes.Clientset) (*string, error) { - var err error - - for _, ingressName := range ingressNames { - var ingress *v1.Service - ingress, err = kubeClientset.CoreV1().Services(ingressNamespace).Get(ingressName, metav1.GetOptions{}) - if err != nil { - continue - } - - var endpoint string - endpoint, err = endpointFromService(ingress) - if err != nil { - continue - } + ingressName := istioIngressName + if gatewayOverride := os.Getenv("GATEWAY_OVERRIDE"); gatewayOverride != "" { + ingressName = gatewayOverride + } + ingressNamespace := istioIngressNamespace + if gatewayNsOverride := os.Getenv("GATEWAY_NAMESPACE_OVERRIDE"); gatewayNsOverride != "" { + ingressNamespace = gatewayNsOverride + } - return &endpoint, nil + ingress, err := kubeClientset.CoreV1().Services(ingressNamespace).Get(ingressName, metav1.GetOptions{}) + if err != nil { + return nil, err } - return nil, err + endpoint, err := endpointFromService(ingress) + if err != nil { + return nil, err + } + return &endpoint, nil } // endpointFromService extracts the endpoint from the service's ingress. @@ -240,6 +241,7 @@ func (sc *SpoofingClient) LogZipkinTrace(traceID string) error { if err != nil { return fmt.Errorf("Error retrieving Zipkin trace: %v", err) } + defer resp.Body.Close() trace, err := ioutil.ReadAll(resp.Body) if err != nil { @@ -247,11 +249,10 @@ func (sc *SpoofingClient) LogZipkinTrace(traceID string) error { } var prettyJSON bytes.Buffer - error := json.Indent(&prettyJSON, trace, "", "\t") - if error != nil { + if error := json.Indent(&prettyJSON, trace, "", "\t"); error != nil { return fmt.Errorf("JSON Parser Error while trying for Pretty-Format: %v, Original Response: %s", error, string(trace)) } - sc.logger.Infof(prettyJSON.String()) + sc.logger.Info(prettyJSON.String()) return nil } diff --git a/vendor/github.com/knative/pkg/test/zipkin/util.go b/vendor/github.com/knative/pkg/test/zipkin/util.go index a06df7e2a51..9dcb6388c21 100644 --- a/vendor/github.com/knative/pkg/test/zipkin/util.go +++ b/vendor/github.com/knative/pkg/test/zipkin/util.go @@ -50,8 +50,10 @@ const ( var zipkinPortForwardPID int -// SetupZipkinTracing sets up zipkin tracing which involves a) Setting up port-forwarding from localhost to zipkin pod on the cluster (pid of the process doing Port-Forward is stored in a global variable). -// b) enable AlwaysSample config for tracing. +// SetupZipkinTracing sets up zipkin tracing which involves: +// 1. Setting up port-forwarding from localhost to zipkin pod on the cluster +// (pid of the process doing Port-Forward is stored in a global variable). +// 2. Enable AlwaysSample config for tracing. func SetupZipkinTracing(kubeClientset *kubernetes.Clientset) error { logger := logging.GetContextLogger("SpoofUtil") @@ -61,7 +63,7 @@ func SetupZipkinTracing(kubeClientset *kubernetes.Clientset) error { zipkinPods, err := kubeClientset.CoreV1().Pods(ZipkinNamespace).List(metav1.ListOptions{LabelSelector: "app=zipkin"}) if err != nil { - return fmt.Errorf("Error retrieving Zipkin pod details : %v", err) + return fmt.Errorf("Error retrieving Zipkin pod details: %v", err) } if len(zipkinPods.Items) == 0 { @@ -70,8 +72,7 @@ func SetupZipkinTracing(kubeClientset *kubernetes.Clientset) error { portForwardCmd := exec.Command("kubectl", "port-forward", "--namespace="+ZipkinNamespace, zipkinPods.Items[0].Name, fmt.Sprintf("%d:%d", ZipkinPort, ZipkinPort)) if err = portForwardCmd.Start(); err != nil { - return fmt.Errorf("Error starting kubectl port-forward command : %v", err) - + return fmt.Errorf("Error starting kubectl port-forward command: %v", err) } zipkinPortForwardPID = portForwardCmd.Process.Pid logger.Infof("Zipkin port-forward process started with PID: %d", zipkinPortForwardPID) @@ -79,7 +80,7 @@ func SetupZipkinTracing(kubeClientset *kubernetes.Clientset) error { // Applying AlwaysSample config to ensure we propagate zipkin header for every request made by this client. trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()}) - logger.Infof("Successfully setup SpoofingClient for Zipkin Tracing") + logger.Info("Successfully setup SpoofingClient for Zipkin Tracing") return nil } @@ -93,7 +94,7 @@ func CleanupZipkinTracingSetup() error { return fmt.Errorf("Encoutered error killing port-forward process in CleanupZipkingTracingSetup() : %v", err) } - logger.Infof("Successfully killed Zipkin port-forward process") + logger.Info("Successfully killed Zipkin port-forward process") return nil } diff --git a/vendor/github.com/knative/pkg/webhook/webhook.go b/vendor/github.com/knative/pkg/webhook/webhook.go index 69fa2af9b30..c30ba65a556 100644 --- a/vendor/github.com/knative/pkg/webhook/webhook.go +++ b/vendor/github.com/knative/pkg/webhook/webhook.go @@ -202,7 +202,7 @@ func getOrGenerateKeyCertsFromSecret(ctx context.Context, client kubernetes.Inte // Validate checks whether "new" and "old" implement HasImmutableFields and checks them, // it then delegates validation to apis.Validatable on "new". -func Validate(ctx context.Context) ResourceCallback { +func Validate(_ context.Context) ResourceCallback { return func(patches *[]jsonpatch.JsonPatchOperation, old GenericCRD, new GenericCRD) error { if immutableNew, ok := new.(apis.Immutable); ok && old != nil { // Copy the old object and set defaults so that we don't reject our own @@ -227,7 +227,7 @@ func Validate(ctx context.Context) ResourceCallback { } // SetDefaults simply leverages apis.Defaultable to set defaults. -func SetDefaults(ctx context.Context) ResourceDefaulter { +func SetDefaults(_ context.Context) ResourceDefaulter { return func(patches *[]jsonpatch.JsonPatchOperation, crd GenericCRD) error { before, after := crd.DeepCopyObject(), crd after.SetDefaults() @@ -527,9 +527,17 @@ func (ac *AdmissionController) mutate(ctx context.Context, kind metav1.GroupVers var patches []jsonpatch.JsonPatchOperation - err := updateGeneration(ctx, &patches, oldObj, newObj) + // Add these before defaulting fields, otherwise defaulting may cause an illegal patch because + // it expects the round tripped through Golang fields to be present already. + rtp, err := roundTripPatch(newBytes, newObj) if err != nil { - logger.Error("Failed to update generation", zap.Error(err)) + return nil, fmt.Errorf("cannot create patch for round tripped newBytes: %v", err) + } + patches = append(patches, rtp...) + + err = updateGeneration(ctx, &patches, oldObj, newObj) + if err != nil { + logger.Errorw("Failed to update generation", zap.Error(err)) return nil, fmt.Errorf("Failed to update generation: %s", err) } @@ -592,6 +600,24 @@ func updateGeneration(ctx context.Context, patches *[]jsonpatch.JsonPatchOperati return nil } +// roundTripPatch generates the JSONPatch that corresponds to round tripping the given bytes through +// the Golang type (JSON -> Golang type -> JSON). Because it is not always true that +// bytes == json.Marshal(json.Unmarshal(bytes)). +// +// For example, if bytes did not contain a 'spec' field and the Golang type specifies its 'spec' +// field without omitempty, then by round tripping through the Golang type, we would have added +// `'spec': {}`. +func roundTripPatch(bytes []byte, unmarshalled interface{}) (duck.JSONPatch, error) { + if unmarshalled == nil { + return duck.JSONPatch{}, nil + } + marshaledBytes, err := json.Marshal(unmarshalled) + if err != nil { + return nil, fmt.Errorf("cannot marshal interface: %v", err) + } + return jsonpatch.CreatePatch(bytes, marshaledBytes) +} + // Not worth fully duck typing since there's no shared schema. type hasSpec struct { Spec json.RawMessage `json:"spec"` @@ -643,7 +669,7 @@ func hasChanged(ctx context.Context, old, new GenericCRD) (bool, error) { return true, nil } -func asGenerational(ctx context.Context, crd GenericCRD) (*duckv1alpha1.Generational, error) { +func asGenerational(_ context.Context, crd GenericCRD) (*duckv1alpha1.Generational, error) { raw, err := json.Marshal(crd) if err != nil { return nil, err