Skip to content

Commit

Permalink
feat: add channels to event lineage graph (#7745)
Browse files Browse the repository at this point in the history
Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Mar 6, 2024
1 parent 42da1e1 commit 1ae172e
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 11 deletions.
39 changes: 39 additions & 0 deletions pkg/graph/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package graph

import (
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
)

Expand Down Expand Up @@ -55,3 +56,41 @@ func (g *Graph) AddBroker(broker eventingv1.Broker) {

v.AddEdge(to, dest, NoTransform)
}

func (g *Graph) AddChannel(channel messagingv1.Channel) {
if channel.Kind == "" {
channel.Kind = "Channel"
}

ref := &duckv1.KReference{
Name: channel.Name,
Namespace: channel.Namespace,
APIVersion: "messaging.knative.dev/v1",
Kind: channel.Kind,
}
dest := &duckv1.Destination{Ref: ref}

v, ok := g.vertices[makeComparableDestination(dest)]
if !ok {
v = &Vertex{
self: dest,
}
g.vertices[makeComparableDestination(dest)] = v
}

if channel.Spec.Delivery == nil || channel.Spec.Delivery.DeadLetterSink == nil {
// no DLS, we are done
return
}

// channel has a DLS, we need to add an edge to that
to, ok := g.vertices[makeComparableDestination(channel.Spec.Delivery.DeadLetterSink)]
if !ok {
to = &Vertex{
self: channel.Spec.Delivery.DeadLetterSink,
}
g.vertices[makeComparableDestination(channel.Spec.Delivery.DeadLetterSink)] = to
}

v.AddEdge(to, dest, NoTransform)
}
125 changes: 114 additions & 11 deletions pkg/graph/constructor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
)
Expand Down Expand Up @@ -123,21 +124,123 @@ func TestAddBroker(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
g := NewGraph()
g.AddBroker(test.broker)
assert.Len(t, g.vertices, len(test.expected))
for k, expected := range test.expected {
actual, ok := g.vertices[k]
assert.True(t, ok)
// assert.Equal can't do equality on function values, which edges have, so we need to do a more complicated check
assert.Equal(t, actual.self, expected.self)
assert.Len(t, actual.inEdges, len(expected.inEdges))
assert.Subset(t, makeComparableEdges(actual.inEdges), makeComparableEdges(expected.inEdges))
assert.Len(t, actual.outEdges, len(expected.outEdges))
assert.Subset(t, makeComparableEdges(actual.outEdges), makeComparableEdges(expected.outEdges))
}
checkTestResult(t, g.vertices, test.expected)
})
}
}

func TestAddChannel(t *testing.T) {
channelWithEdge := &Vertex{
self: &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "my-channel",
Namespace: "default",
APIVersion: "messaging.knative.dev/v1",
Kind: "Channel",
},
},
}
destinationWithEdge := &Vertex{
self: &duckv1.Destination{
URI: sampleUri,
},
}
channelWithEdge.AddEdge(destinationWithEdge, &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "my-channel",
Namespace: "default",
APIVersion: "messaging.knative.dev/v1",
Kind: "Channel",
},
}, NoTransform)
tests := []struct {
name string
channel messagingv1.Channel
expected map[comparableDestination]*Vertex
}{
{
name: "no DLS",
channel: messagingv1.Channel{
ObjectMeta: metav1.ObjectMeta{
Name: "my-channel",
Namespace: "default",
},
},
expected: map[comparableDestination]*Vertex{
{
Ref: duckv1.KReference{
Name: "my-channel",
Namespace: "default",
APIVersion: "messaging.knative.dev/v1",
Kind: "Channel",
},
}: {
self: &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "my-channel",
Namespace: "default",
APIVersion: "messaging.knative.dev/v1",
Kind: "Channel",
},
},
},
},
}, {
name: "DLS",
channel: messagingv1.Channel{
ObjectMeta: metav1.ObjectMeta{
Name: "my-channel",
Namespace: "default",
},
Spec: messagingv1.ChannelSpec{
ChannelableSpec: eventingduckv1.ChannelableSpec{
Delivery: &eventingduckv1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
URI: sampleUri,
},
},
},
},
},
expected: map[comparableDestination]*Vertex{
{
Ref: duckv1.KReference{
Name: "my-channel",
Namespace: "default",
APIVersion: "messaging.knative.dev/v1",
Kind: "Channel",
},
}: channelWithEdge,
{
URI: *sampleUri,
}: destinationWithEdge,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
g := NewGraph()
g.AddChannel(test.channel)
checkTestResult(t, g.vertices, test.expected)
})
}
}

func checkTestResult(t *testing.T, actualVertices map[comparableDestination]*Vertex, expectedVertices map[comparableDestination]*Vertex) {
assert.Len(t, actualVertices, len(expectedVertices))
for k, expected := range expectedVertices {
actual, ok := actualVertices[k]
assert.True(t, ok)
// assert.Equal can't do equality on function values, which edges have, so we need to do a more complicated check
assert.Equal(t, actual.self, expected.self)
assert.Len(t, actual.inEdges, len(expected.inEdges))
assert.Subset(t, makeComparableEdges(actual.inEdges), makeComparableEdges(expected.inEdges))
assert.Len(t, actual.outEdges, len(expected.outEdges))
assert.Subset(t, makeComparableEdges(actual.outEdges), makeComparableEdges(expected.outEdges))
}
}

func makeComparableEdges(edges []*Edge) []comparableEdge {
res := make([]comparableEdge, len(edges))

Expand Down

0 comments on commit 1ae172e

Please sign in to comment.