Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

veneur-proxy: Add option for GRPC streaming to veneur-global #2

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type ProxyConfig struct {
ForwardTimeout string `yaml:"forward_timeout"`
GrpcAddress string `yaml:"grpc_address"`
GrpcForwardAddress string `yaml:"grpc_forward_address"`
GrpcStream bool `yaml:"grpc_stream"`
HTTPAddress string `yaml:"http_address"`
IdleConnectionTimeout string `yaml:"idle_connection_timeout"`
IgnoreTags []matcher.TagMatcher `yaml:"ignore_tags"`
Expand Down
1 change: 1 addition & 0 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func NewProxyFromConfig(
proxysrv.WithIgnoredTags(proxy.ignoredTags),
proxysrv.WithLog(logrus.NewEntry(logger)),
proxysrv.WithTraceClient(proxy.TraceClient),
proxysrv.WithEnableStreaming(conf.GrpcStream),
)
if err != nil {
logger.WithError(err).Fatal("Failed to initialize the gRPC server")
Expand Down
6 changes: 6 additions & 0 deletions proxysrv/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,9 @@ func WithTraceClient(c *trace.Client) Option {
opts.traceClient = c
}
}

func WithEnableStreaming(streaming bool) Option {
return func(opts *options) {
opts.streaming = streaming
}
}
27 changes: 23 additions & 4 deletions proxysrv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type options struct {
traceClient *trace.Client
statsInterval time.Duration
ignoredTags []matcher.TagMatcher
streaming bool
}

// New creates a new Server with the provided destinations. The server returned
Expand Down Expand Up @@ -316,10 +317,28 @@ func (s *Server) forward(ctx context.Context, dest string, ms []*metricpb.Metric
}

c := forwardrpc.NewForwardClient(conn)
_, err = c.SendMetrics(ctx, &forwardrpc.MetricList{Metrics: ms})
if err != nil {
return fmt.Errorf("failed to send %d metrics over gRPC: %v",
len(ms), err)

if s.opts.streaming {
forwardStream, err := c.SendMetricsV2(ctx)
if err != nil {
return fmt.Errorf("failed to stream %d metrics over gRPC: %v",
len(ms), err)
}

defer forwardStream.CloseAndRecv()

for i, metric := range ms {
err := forwardStream.Send(metric)
if err != nil {
return fmt.Errorf("failed to stream (%d/%d) metrics over gRPC: %v", len(ms)-i, len(ms), err)
}
}
} else {
_, err = c.SendMetrics(ctx, &forwardrpc.MetricList{Metrics: ms})
if err != nil {
return fmt.Errorf("failed to send %d metrics over gRPC: %v",
len(ms), err)
}
}

_ = metrics.ReportBatch(s.opts.traceClient, ssf.RandomlySample(0.1,
Expand Down
117 changes: 62 additions & 55 deletions proxysrv/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,66 +159,73 @@ func TestCountActiveHandlers(t *testing.T) {
// Repeat this test for a couple different numbers of calls
for _, n := range []int{0, 1, 5, 10} {
n := n
t.Run(fmt.Sprintf("handlers=%d", n), func(t *testing.T) {
t.Parallel()
for _, protocol := range []string{"grpc", "grpc-stream"} {
t.Run(fmt.Sprintf("handlers=%d protocol=%s", n, protocol), func(t *testing.T) {
t.Parallel()

// Create some test servers that will block until explicitly stopped
done := make(chan struct{})
blocking := createTestForwardServers(t, 3, func(_ []*metricpb.Metric) {
<-done
})
defer stopTestForwardServers(blocking)
// put all of the servers into a ring
ring := consistent.New()
ring.Set(addrsFromServers(blocking))

metrics := &forwardrpc.MetricList{metrictest.RandomForwardMetrics(100)}

opts := []Option{WithStatsInterval(10 * time.Nanosecond)}
if protocol == "grpc-stream" {
opts = append(opts, WithEnableStreaming(true))
}
s := newServer(t, ring, opts...)

// Create some test servers that will block until explicitly stopped
done := make(chan struct{})
blocking := createTestForwardServers(t, 3, func(_ []*metricpb.Metric) {
<-done
})
defer stopTestForwardServers(blocking)
// put all of the servers into a ring
ring := consistent.New()
ring.Set(addrsFromServers(blocking))

metrics := &forwardrpc.MetricList{metrictest.RandomForwardMetrics(100)}
s := newServer(t, ring, WithStatsInterval(10*time.Nanosecond))

// Make the specified number of calls, all of these should spawn
// goroutines that will block
for i := 0; i < n; i++ {
s.SendMetrics(context.Background(), metrics)
}
// Make the specified number of calls, all of these should spawn
// goroutines that will block
for i := 0; i < n; i++ {
s.SendMetrics(context.Background(), metrics)
}

// Since the goroutines are forked immediately after the function
// call, it might take a bit of time for all of them to start.
// We should wait for a little bit
tick := time.NewTicker(10 * time.Nanosecond)
defer tick.Stop()

timeout := time.NewTicker(10 * time.Second)
defer timeout.Stop()
for int64(n) != atomic.LoadInt64(s.activeProxyHandlers) {
select {
case <-tick.C:
// Report statistics, just to exercise the funtion. This
// would normally be called by the server periodically
s.reportStats()
case <-timeout.C:
assert.Failf(t, "The count of active proxy handlers didn't increase enough before the timeout",
"Expected: %d\tCurrent: %d", n, atomic.LoadInt64(s.activeProxyHandlers))
return
// Since the goroutines are forked immediately after the function
// call, it might take a bit of time for all of them to start.
// We should wait for a little bit
tick := time.NewTicker(10 * time.Nanosecond)
defer tick.Stop()

timeout := time.NewTicker(10 * time.Second)
defer timeout.Stop()
for int64(n) != atomic.LoadInt64(s.activeProxyHandlers) {
select {
case <-tick.C:
// Report statistics, just to exercise the funtion. This
// would normally be called by the server periodically
s.reportStats()
case <-timeout.C:
assert.Failf(t, "The count of active proxy handlers didn't increase enough before the timeout",
"Expected: %d\tCurrent: %d", n, atomic.LoadInt64(s.activeProxyHandlers))
return
}
}
}

// Stop all of the servers and check that the counter goes to zero
close(done)
timeout = time.NewTicker(10 * time.Second)
defer timeout.Stop()
for atomic.LoadInt64(s.activeProxyHandlers) != 0 {
select {
case <-tick.C:
// Report statistics, just to exercise the funtion. This
// would normally be called by the server periodically
s.reportStats()
case <-timeout.C:
assert.Failf(t, "The count of active proxy handlers didn't drop to zero",
"Current: %d", atomic.LoadInt64(s.activeProxyHandlers))
return
// Stop all of the servers and check that the counter goes to zero
close(done)
timeout = time.NewTicker(10 * time.Second)
defer timeout.Stop()
for atomic.LoadInt64(s.activeProxyHandlers) != 0 {
select {
case <-tick.C:
// Report statistics, just to exercise the funtion. This
// would normally be called by the server periodically
s.reportStats()
case <-timeout.C:
assert.Failf(t, "The count of active proxy handlers didn't drop to zero",
"Current: %d", atomic.LoadInt64(s.activeProxyHandlers))
return
}
}
}
})
})
}
}
}

Expand Down