Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NET-5073 - ProxyConfiguration: implement various connection options #19187

Merged
merged 3 commits into from
Oct 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (b *Builder) buildExposePaths(workload *pbcatalog.Workload) {
buildListener()

b.addExposePathsRoute(exposePath, clusterName).
addLocalAppCluster(clusterName).
addLocalAppCluster(clusterName, nil).
addLocalAppStaticEndpoints(clusterName, exposePath.LocalPathPort)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ package builder
import (
"fmt"

pbmesh "github.com/hashicorp/consul/proto-public/pbmesh/v2beta1"
"google.golang.org/protobuf/types/known/wrapperspb"

"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/envoyextensions/xdscommon"
pbauth "github.com/hashicorp/consul/proto-public/pbauth/v2beta1"
Expand All @@ -31,13 +34,13 @@ func (b *Builder) BuildLocalApp(workload *pbcatalog.Workload, ctp *pbauth.Comput

if port.Protocol != pbcatalog.Protocol_PROTOCOL_MESH {
foundInboundNonMeshPorts = true
lb.addInboundRouter(clusterName, routeName, port, portName, trafficPermissions[portName]).
lb.addInboundRouter(clusterName, routeName, port, portName, trafficPermissions[portName], b.proxyCfg.GetDynamicConfig().GetInboundConnections()).
addInboundTLS()

if isL7(port.Protocol) {
b.addLocalAppRoute(routeName, clusterName)
b.addLocalAppRoute(routeName, clusterName, portName)
}
b.addLocalAppCluster(clusterName).
b.addLocalAppCluster(clusterName, &portName).
addLocalAppStaticEndpoints(clusterName, port.GetPort())
}
}
Expand Down Expand Up @@ -264,10 +267,16 @@ func (b *Builder) addInboundListener(name string, workload *pbcatalog.Workload)
// Add TLS inspection capability to be able to parse ALPN and/or SNI information from inbound connections.
listener.Capabilities = append(listener.Capabilities, pbproxystate.Capability_CAPABILITY_L4_TLS_INSPECTION)

if b.proxyCfg.GetDynamicConfig() != nil && b.proxyCfg.GetDynamicConfig().InboundConnections != nil {
listener.BalanceConnections = pbproxystate.BalanceConnections(b.proxyCfg.DynamicConfig.InboundConnections.BalanceInboundConnections)
}
return b.NewListenerBuilder(listener)
}

func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string, port *pbcatalog.WorkloadPort, portName string, tp *pbproxystate.TrafficPermissions) *ListenerBuilder {
func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string,
port *pbcatalog.WorkloadPort, portName string, tp *pbproxystate.TrafficPermissions,
ic *pbmesh.InboundConnectionsConfig) *ListenerBuilder {

if l.listener == nil {
return l
}
Expand All @@ -289,6 +298,15 @@ func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string,
AlpnProtocols: []string{getAlpnProtocolFromPortName(portName)},
},
}

if ic != nil {
// MaxInboundConnections is uint32 that is used on:
// - router destinations MaxInboundConnection (uint64).
// - cluster circuit breakers UpstreamLimits.MaxConnections (uint32).
// It is cast to a uint64 here similarly as it is to the proxystateconverter code.
r.GetL4().MaxInboundConnections = uint64(ic.MaxInboundConnections)
jmurret marked this conversation as resolved.
Show resolved Hide resolved
}

l.listener.Routers = append(l.listener.Routers, r)
} else if isL7(port.Protocol) {
r := &pbproxystate.Router{
Expand All @@ -308,6 +326,13 @@ func (l *ListenerBuilder) addInboundRouter(clusterName string, routeName string,
AlpnProtocols: []string{getAlpnProtocolFromPortName(portName)},
},
}

if ic != nil {
// MaxInboundConnections is cast to a uint64 here similarly as it is to the
// as the L4 case statement above and in proxystateconverter code.
r.GetL7().MaxInboundConnections = uint64(ic.MaxInboundConnections)
}

l.listener.Routers = append(l.listener.Routers, r)
}
return l
Expand Down Expand Up @@ -339,7 +364,7 @@ func getAlpnProtocolFromPortName(portName string) string {
return fmt.Sprintf("consul~%s", portName)
}

func (b *Builder) addLocalAppRoute(routeName string, clusterName string) {
func (b *Builder) addLocalAppRoute(routeName, clusterName, portName string) {
proxyRouteRule := &pbproxystate.RouteRule{
Match: &pbproxystate.RouteMatch{
PathMatch: &pbproxystate.PathMatch{
Expand All @@ -356,6 +381,18 @@ func (b *Builder) addLocalAppRoute(routeName string, clusterName string) {
},
},
}
if b.proxyCfg.GetDynamicConfig() != nil && b.proxyCfg.GetDynamicConfig().LocalConnection != nil {
lc, lcOK := b.proxyCfg.GetDynamicConfig().LocalConnection[portName]
if lcOK {
proxyRouteRule.Destination.DestinationConfiguration =
&pbproxystate.DestinationConfiguration{
TimeoutConfig: &pbproxystate.TimeoutConfig{
Timeout: lc.RequestTimeout,
},
}
}
}

// Each route name for the local app is listenerName:port since there is a route per port on the local app listener.
b.addRoute(routeName, &pbproxystate.Route{
VirtualHosts: []*pbproxystate.VirtualHost{{
Expand All @@ -373,9 +410,9 @@ func isL7(protocol pbcatalog.Protocol) bool {
return false
}

func (b *Builder) addLocalAppCluster(clusterName string) *Builder {
func (b *Builder) addLocalAppCluster(clusterName string, portName *string) *Builder {
// Make cluster for this router destination.
b.proxyStateTemplate.ProxyState.Clusters[clusterName] = &pbproxystate.Cluster{
cluster := &pbproxystate.Cluster{
Group: &pbproxystate.Cluster_EndpointGroup{
EndpointGroup: &pbproxystate.EndpointGroup{
Group: &pbproxystate.EndpointGroup_Static{
Expand All @@ -384,20 +421,34 @@ func (b *Builder) addLocalAppCluster(clusterName string) *Builder {
},
},
}

// configure inbound connections or connection timeout if either is defined
if b.proxyCfg.GetDynamicConfig() != nil && portName != nil {
lc, lcOK := b.proxyCfg.DynamicConfig.LocalConnection[*portName]

if lcOK || b.proxyCfg.DynamicConfig.InboundConnections != nil {
cluster.GetEndpointGroup().GetStatic().Config = &pbproxystate.StaticEndpointGroupConfig{}

if lcOK {
cluster.GetEndpointGroup().GetStatic().GetConfig().ConnectTimeout = lc.ConnectTimeout
}

if b.proxyCfg.DynamicConfig.InboundConnections != nil {
cluster.GetEndpointGroup().GetStatic().GetConfig().CircuitBreakers = &pbproxystate.CircuitBreakers{
UpstreamLimits: &pbproxystate.UpstreamLimits{
MaxConnections: &wrapperspb.UInt32Value{Value: b.proxyCfg.DynamicConfig.InboundConnections.MaxInboundConnections},
},
}
}
}
}

b.proxyStateTemplate.ProxyState.Clusters[clusterName] = cluster
jmurret marked this conversation as resolved.
Show resolved Hide resolved
return b
}

func (b *Builder) addBlackHoleCluster() *Builder {
b.proxyStateTemplate.ProxyState.Clusters[xdscommon.BlackHoleClusterName] = &pbproxystate.Cluster{
Group: &pbproxystate.Cluster_EndpointGroup{
EndpointGroup: &pbproxystate.EndpointGroup{
Group: &pbproxystate.EndpointGroup_Static{
Static: &pbproxystate.StaticEndpointGroup{},
},
},
},
}
return b
return b.addLocalAppCluster(xdscommon.BlackHoleClusterName, nil)
}

func (b *Builder) addLocalAppStaticEndpoints(clusterName string, port uint32) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
package builder

import (
"google.golang.org/protobuf/types/known/durationpb"
"sort"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -139,17 +142,80 @@ func TestBuildLocalApp_WithProxyConfiguration(t *testing.T) {
},
},
},
// source/local-and-inbound-connections shows that configuring LocalCOnnection
// and InboundConnections in DynamicConfig will set fields on standard clusters and routes,
// but will not set fields on exposed path clusters and routes.
"source/local-and-inbound-connections": {
workload: &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{
Host: "10.0.0.1",
},
},
Ports: map[string]*pbcatalog.WorkloadPort{
"port1": {Port: 8080, Protocol: pbcatalog.Protocol_PROTOCOL_TCP},
"port2": {Port: 20000, Protocol: pbcatalog.Protocol_PROTOCOL_MESH},
"port3": {Port: 8081, Protocol: pbcatalog.Protocol_PROTOCOL_HTTP},
},
},
proxyCfg: &pbmesh.ComputedProxyConfiguration{
DynamicConfig: &pbmesh.DynamicConfig{
LocalConnection: map[string]*pbmesh.ConnectionConfig{
"port1": {
ConnectTimeout: durationpb.New(6 * time.Second),
RequestTimeout: durationpb.New(7 * time.Second)},
"port3": {
ConnectTimeout: durationpb.New(8 * time.Second),
RequestTimeout: durationpb.New(9 * time.Second)},
},
InboundConnections: &pbmesh.InboundConnectionsConfig{
MaxInboundConnections: 123,
BalanceInboundConnections: pbmesh.BalanceConnections(pbproxystate.BalanceConnections_BALANCE_CONNECTIONS_EXACT),
},
ExposeConfig: &pbmesh.ExposeConfig{
ExposePaths: []*pbmesh.ExposePath{
{
ListenerPort: 1234,
Path: "/health",
LocalPathPort: 9090,
Protocol: pbmesh.ExposePathProtocol_EXPOSE_PATH_PROTOCOL_HTTP,
},
{
ListenerPort: 1235,
Path: "GetHealth",
LocalPathPort: 9091,
Protocol: pbmesh.ExposePathProtocol_EXPOSE_PATH_PROTOCOL_HTTP2,
},
},
},
},
},
},
}

for name, c := range cases {
t.Run(name, func(t *testing.T) {
proxyTmpl := New(testProxyStateTemplateID(), testIdentityRef(), "foo.consul", "dc1", true, c.proxyCfg).
BuildLocalApp(c.workload, nil).
Build()

// sort routers because of test flakes where order was flip flopping.
actualRouters := proxyTmpl.ProxyState.Listeners[0].Routers
sort.Slice(actualRouters, func(i, j int) bool {
return actualRouters[i].String() < actualRouters[j].String()
})

actual := protoToJSON(t, proxyTmpl)
expected := golden.Get(t, actual, name+".golden")
expected := JSONToProxyTemplate(t, golden.GetBytes(t, actual, name+".golden"))

require.JSONEq(t, expected, actual)
// sort routers on listener from golden file
expectedRouters := expected.ProxyState.Listeners[0].Routers
sort.Slice(expectedRouters, func(i, j int) bool {
return expectedRouters[i].String() < expectedRouters[j].String()
})

// convert back to json after sorting so that test output does not contain extraneous fields.
require.Equal(t, protoToJSON(t, expected), protoToJSON(t, proxyTmpl))
})
}
}
Expand Down
Loading