Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support: nodeselector in apiserversource #7584

Merged
merged 9 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions pkg/apis/feature/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,25 @@ func (e Flags) String() string {
return fmt.Sprintf("%+v", map[string]Flag(e))
}

func (e Flags) NodeSelector() map[string]string {
// Check if NodeSelector is not nil
if e == nil {
return map[string]string{}
}

nodeSelectorMap := make(map[string]string)

for k, v := range e {
if strings.Contains(k, NodeSelectorLabel) {
key := strings.TrimPrefix(k, NodeSelectorLabel)
value := strings.TrimSpace(string(v))
nodeSelectorMap[key] = value
}
}

return nodeSelectorMap
}

// NewFlagsConfigFromMap creates a Flags from the supplied Map
func NewFlagsConfigFromMap(data map[string]string) (Flags, error) {
flags := newDefaults()
Expand All @@ -122,6 +141,8 @@ func NewFlagsConfigFromMap(data map[string]string) (Flags, error) {
flags[sanitizedKey] = Permissive
} else if k == TransportEncryption && strings.EqualFold(v, string(Strict)) {
flags[sanitizedKey] = Strict
} else if strings.Contains(k, NodeSelectorLabel) {
flags[sanitizedKey] = Flag(v)
} else {
return flags, fmt.Errorf("cannot parse the feature flag '%s' = '%s'", k, v)
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/apis/feature/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ func TestGetFlags(t *testing.T) {
require.True(t, flags.IsAllowed("my-enabled-flag"))
require.True(t, flags.IsAllowed("my-allowed-flag"))
require.False(t, flags.IsAllowed("non-disabled-flag"))

nodeSelector := flags.NodeSelector()
expectedNodeSelector := map[string]string{"testkey": "testvalue", "testkey1": "testvalue1", "testkey2": "testvalue2"}
require.Equal(t, expectedNodeSelector, nodeSelector)
}

func TestShouldNotOverrideDefaults(t *testing.T) {

f, err := NewFlagsConfigFromMap(map[string]string{})
require.Nil(t, err)
require.NotNil(t, f)
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/feature/flag_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ const (
TransportEncryption = "transport-encryption"
EvenTypeAutoCreate = "eventtype-auto-create"
OIDCAuthentication = "authentication-oidc"
NodeSelectorLabel = "apiserversources.nodeselector."
)
3 changes: 3 additions & 0 deletions pkg/apis/feature/testdata/config-features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ data:
my-enabled-flag: "enabled"
my-disabled-flag: "disabled"
my-allowed-flag: "allowed"
apiserversources.nodeselector.testkey: testvalue
apiserversources.nodeselector.testkey1: testvalue1
apiserversources.nodeselector.testkey2: testvalue2
9 changes: 4 additions & 5 deletions pkg/reconciler/apiserversource/apiserversource.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour
// no Namespace defined in dest.Ref, we will use the Namespace of the source
// as the Namespace of dest.Ref.
if dest.Ref.Namespace == "" {
//TODO how does this work with deprecated fields
// TODO how does this work with deprecated fields
dest.Ref.Namespace = source.GetNamespace()
}
}
Expand All @@ -118,7 +118,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1.ApiServerSour
if featureFlags.IsOIDCAuthentication() {
// Create the role
err := r.createOIDCRole(ctx, source)

if err != nil {
logging.FromContext(ctx).Errorw("Failed when creating the OIDC Role for ApiServerSource", zap.Error(err))
return err
Expand Down Expand Up @@ -225,6 +224,8 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer
// return nil, err
// }

featureFlags := feature.FromContext(ctx)

adapterArgs := resources.ReceiveAdapterArgs{
Image: r.receiveAdapterImage,
Source: src,
Expand All @@ -235,6 +236,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1.ApiServer
Configs: r.configs,
Namespaces: namespaces,
AllNamespaces: allNamespaces,
NodeSelector: featureFlags.NodeSelector(),
}

expected, err := resources.MakeReceiveAdapter(&adapterArgs)
Expand Down Expand Up @@ -357,7 +359,6 @@ func (r *Reconciler) runAccessCheck(ctx context.Context, src *v1.ApiServerSource

src.Status.MarkNoSufficientPermissions(lastReason, "User %s cannot %s", user, missing)
return fmt.Errorf("insufficient permissions: User %s cannot %s", user, missing)

}

func (r *Reconciler) createCloudEventAttributes(src *v1.ApiServerSource) ([]duckv1.CloudEventAttributes, error) {
Expand Down Expand Up @@ -385,7 +386,6 @@ func (r *Reconciler) createOIDCRole(ctx context.Context, source *v1.ApiServerSou
roleName := resources.GetOIDCTokenRoleName(source.Name)

expected, err := resources.MakeOIDCRole(source)

if err != nil {
return fmt.Errorf("Cannot create OIDC role for ApiServerSource %s/%s: %w", source.GetName(), source.GetNamespace(), err)
}
Expand Down Expand Up @@ -417,7 +417,6 @@ func (r *Reconciler) createOIDCRole(ctx context.Context, source *v1.ApiServerSou
}

return nil

}

// createOIDCRoleBinding: this function will call resources package to get the rolebinding object
Expand Down
104 changes: 104 additions & 0 deletions pkg/reconciler/apiserversource/apiserversource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,75 @@ func TestReconcile(t *testing.T) {
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
}, {
Name: "Valid with nodeSelector",

Ctx: feature.ToContext(context.Background(), feature.Flags{
"apiserversources.nodeselector.testkey1": "testvalue1",
"apiserversources.nodeselector.testkey2": "testvalue2",
}),
Objects: []runtime.Object{
rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Resources: []sourcesv1.APIVersionKindSelector{{
APIVersion: "v1",
Kind: "Namespace",
}},
SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
}),
rttestingv1.WithApiServerSourceUID(sourceUID),
rttestingv1.WithApiServerSourceObjectMetaGeneration(generation),
),
rttestingv1.NewChannel(sinkName, testNS,
rttestingv1.WithInitChannelConditions,
rttestingv1.WithChannelAddress(sinkAddressable),
),
makeAvailableReceiveAdapter(t),
},
Key: testNS + "/" + sourceName,
WantStatusUpdates: []clientgotesting.UpdateActionImpl{{
Object: rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Resources: []sourcesv1.APIVersionKindSelector{{
APIVersion: "v1",
Kind: "Namespace",
}},
SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
}),
rttestingv1.WithApiServerSourceUID(sourceUID),
rttestingv1.WithApiServerSourceObjectMetaGeneration(generation),
// Status Update:
rttestingv1.WithInitApiServerSourceConditions,
rttestingv1.WithApiServerSourceDeployed,
rttestingv1.WithApiServerSourceSink(sinkURI),
rttestingv1.WithApiServerSourceSufficientPermissions,
rttestingv1.WithApiServerSourceReferenceModeEventTypes(source),
rttestingv1.WithApiServerSourceStatusObservedGeneration(generation),
rttestingv1.WithApiServerSourceStatusNamespaces([]string{testNS}),
rttestingv1.WithApiServerSourceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(),
),
}},
WantCreates: []runtime.Object{
makeSubjectAccessReview("namespaces", "get", "default"),
makeSubjectAccessReview("namespaces", "list", "default"),
makeSubjectAccessReview("namespaces", "watch", "default"),
},

WantUpdates: []clientgotesting.UpdateActionImpl{{
Object: makeAvailableReceiveAdapterWithNodeSelector(t, map[string]string{
"testkey1": "testvalue1",
"testkey2": "testvalue2",
}),
}},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", sourceName),
Eventf(corev1.EventTypeNormal, "ApiServerSourceDeploymentUpdated", `Deployment "apiserversource-test-apiserver-source-1234" updated`),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(sourceName, testNS),
},
WithReactors: []clientgotesting.ReactionFunc{subjectAccessReviewCreateReactor(true)},
SkipNamespaceValidation: true, // SubjectAccessReview objects are cluster-scoped.
},
}

Expand Down Expand Up @@ -1445,3 +1514,38 @@ func makeApiServerSourceOIDCServiceAccountWithoutOwnerRef() *corev1.ServiceAccou

return sa
}

func makeAvailableReceiveAdapterWithNodeSelector(t *testing.T, selector map[string]string) *appsv1.Deployment {
t.Helper()

src := rttestingv1.NewApiServerSource(sourceName, testNS,
rttestingv1.WithApiServerSourceSpec(sourcesv1.ApiServerSourceSpec{
Resources: []sourcesv1.APIVersionKindSelector{{
APIVersion: "v1",
Kind: "Namespace",
}},
SourceSpec: duckv1.SourceSpec{Sink: sinkDest},
}),
rttestingv1.WithApiServerSourceUID(sourceUID),
// Status Update:
rttestingv1.WithInitApiServerSourceConditions,
rttestingv1.WithApiServerSourceDeployed,
rttestingv1.WithApiServerSourceSink(sinkURI),
)

args := resources.ReceiveAdapterArgs{
Image: image,
Source: src,
Labels: resources.Labels(sourceName),
SinkURI: sinkURI.String(),
Configs: &reconcilersource.EmptyVarsGenerator{},
NodeSelector: selector,
Namespaces: []string{testNS},
}

ra, err := resources.MakeReceiveAdapter(&args)
require.NoError(t, err)

rttesting.WithDeploymentAvailable()(ra)
return ra
}
45 changes: 24 additions & 21 deletions pkg/reconciler/apiserversource/resources/receive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type ReceiveAdapterArgs struct {
Configs reconcilersource.ConfigAccessor
Namespaces []string
AllNamespaces bool
NodeSelector map[string]string
}

// MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for
Expand Down Expand Up @@ -82,6 +83,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) (*appsv1.Deployment, error) {
Labels: args.Labels,
},
Spec: corev1.PodSpec{
NodeSelector: args.NodeSelector,
ServiceAccountName: args.Source.Spec.ServiceAccountName,
EnableServiceLinks: ptr.Bool(false),
Containers: []corev1.Container{
Expand Down Expand Up @@ -149,29 +151,30 @@ func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) {
config = string(b)
}

envs := []corev1.EnvVar{{
Name: adapter.EnvConfigSink,
Value: args.SinkURI,
}, {
Name: "K_SOURCE_CONFIG",
Value: config,
}, {
Name: "SYSTEM_NAMESPACE",
Value: system.Namespace(),
}, {
Name: adapter.EnvConfigNamespace,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
envs := []corev1.EnvVar{
{
Name: adapter.EnvConfigSink,
Value: args.SinkURI,
}, {
Name: "K_SOURCE_CONFIG",
Value: config,
}, {
Name: "SYSTEM_NAMESPACE",
Value: system.Namespace(),
}, {
Name: adapter.EnvConfigNamespace,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "metadata.namespace",
},
},
}, {
Name: adapter.EnvConfigName,
Value: args.Source.Name,
}, {
Name: "METRICS_DOMAIN",
Value: "knative.dev/eventing",
},
}, {
Name: adapter.EnvConfigName,
Value: args.Source.Name,
}, {
Name: "METRICS_DOMAIN",
Value: "knative.dev/eventing",
},
}

if args.CACerts != nil {
Expand Down
Loading