Skip to content

Commit

Permalink
Merge pull request #4 from nachocano/filter-exp
Browse files Browse the repository at this point in the history
Trigger filters on event source and type
  • Loading branch information
Harwayne authored Feb 14, 2019
2 parents ae665bd + 29acf36 commit dbe650a
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 22 deletions.
5 changes: 3 additions & 2 deletions pkg/apis/eventing/v1alpha1/trigger_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func (ts *TriggerSpec) SetDefaults() {
if ts.Broker == "" {
ts.Broker = "default"
}
if ts.Type == "" {
ts.Type = "Any"
// Make a default filter that allows anything.
if ts.Filter == nil {
ts.Filter = &TriggerFilter{&TriggerFilterAttributes{Type: TriggerAnyFilter, Source: TriggerAnyFilter}}
}
}
15 changes: 14 additions & 1 deletion pkg/apis/eventing/v1alpha1/trigger_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,19 @@ type TriggerSpec struct {
// +optional
DeprecatedGeneration int64 `json:"generation,omitempty"`

Broker string `json:"broker,omitempty"`
Broker string `json:"broker,omitempty"`

// +optional
Filter *TriggerFilter `json:"filter,omitempty"`

Subscriber *SubscriberSpec `json:"subscriber,omitempty"`
}

type TriggerFilter struct {
ExactMatch *TriggerFilterAttributes `json:"exactMatch,omitempty"`
}

type TriggerFilterAttributes struct {
Type string `json:"type,omitempty"`
Source string `json:"source,omitempty"`
}
Expand Down Expand Up @@ -96,6 +106,9 @@ const (
TriggerConditionVirtualService duckv1alpha1.ConditionType = "VirtualService"

TriggerConditionSubscribed duckv1alpha1.ConditionType = "Subscribed"

// Constant to represent that we should allow anything.
TriggerAnyFilter = "Any"
)

// GetCondition returns the condition currently associated with the given type, or nil.
Expand Down
9 changes: 7 additions & 2 deletions pkg/apis/eventing/v1alpha1/trigger_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ func (ts *TriggerSpec) Validate() *apis.FieldError {
errs = errs.Also(fe)
}

if ts.Type == "" {
fe := apis.ErrMissingField("type")
if ts.Filter == nil {
fe := apis.ErrMissingField("filter")
errs = errs.Also(fe)
}

if ts.Filter.ExactMatch == nil {
fe := apis.ErrMissingField("filter.exactMatch")
errs = errs.Also(fe)
}

Expand Down
50 changes: 50 additions & 0 deletions pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 11 additions & 14 deletions pkg/broker/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
)

const (
Any = "Any"
)

// Receiver parses Cloud Events and sends them to GCP PubSub.
// Receiver parses Cloud Events and sends them to the channel.
type Receiver struct {
logger *zap.Logger
client client.Client
Expand All @@ -52,11 +48,11 @@ func New(logger *zap.Logger, client client.Client) (*Receiver, manager.Runnable)
}

func (r *Receiver) newMessageReceiver() *provisioners.MessageReceiver {
return provisioners.NewMessageReceiver(r.sendEventToTopic, r.logger.Sugar())
return provisioners.NewMessageReceiver(r.sendEvent, r.logger.Sugar())
}

// sendEventToTopic sends a message to the Cloud Pub/Sub Topic backing the Channel.
func (r *Receiver) sendEventToTopic(channel provisioners.ChannelReference, message *provisioners.Message) error {
// sendEvent sends a message to the Channel.
func (r *Receiver) sendEvent(channel provisioners.ChannelReference, message *provisioners.Message) error {
r.logger.Debug("received message")
ctx := context.Background()

Expand Down Expand Up @@ -97,14 +93,15 @@ func (r *Receiver) getTrigger(ctx context.Context, ref provisioners.ChannelRefer
return t, err
}

func (r *Receiver) shouldSendMessage(t *eventingv1alpha1.TriggerSpec, m *provisioners.Message) bool {
// TODO More filtering!
if t.Type != Any && t.Type != m.Headers["Ce-Eventtype"] {
r.logger.Debug("Wrong type", zap.String("trigger.spec.type", t.Type), zap.String("message.type", m.Headers["Ce-Eventtype"]), zap.Any("m", m))
func (r *Receiver) shouldSendMessage(ts *eventingv1alpha1.TriggerSpec, m *provisioners.Message) bool {
filterType := ts.Filter.ExactMatch.Type
if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != m.Headers["Ce-Eventtype"] {
r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.exactMatch.type", filterType), zap.String("message.type", m.Headers["Ce-Eventtype"]))
return false
}
if t.Source != "" && t.Source != m.Headers["Ce-Source"] {
r.logger.Debug("Wrong source", zap.String("trigger.spec.source", t.Source), zap.String("message.source", m.Headers["Ce-Source"]))
filterSource := ts.Filter.ExactMatch.Source
if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != m.Headers["Ce-Source"] {
r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.exactMatch.source", filterSource), zap.String("message.source", m.Headers["Ce-Source"]))
return false
}
return true
Expand Down
1 change: 0 additions & 1 deletion t.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ kind: Trigger
metadata:
name: t
spec:
type: Any
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
Expand Down
6 changes: 4 additions & 2 deletions t2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ metadata:
spec:
# Our Cloud Event parsing library seems to have a bug and forces to put type and source in double
# quotes (as it thinks the actual value is `"foo"`, not `foo`).
type: '"com.example.someevent"'
filter:
exactMatch:
type: '"com.example.someevent"'
source: '"/mycontext/subcontext"'
subscriber:
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: message-dumper

0 comments on commit dbe650a

Please sign in to comment.