Skip to content

Commit

Permalink
feat(telemetry): Allow customizing polling from stats poller (#1634)
Browse files Browse the repository at this point in the history
- poll requests with period=0 return single stats reading
- poll requests can define number of polls to return

Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
  • Loading branch information
ondrej-fabry authored Mar 4, 2020
1 parent 9a4f14a commit e88f3bd
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 63 deletions.
39 changes: 25 additions & 14 deletions examples/grpc_vpp/stats_poller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package main

import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"time"

"github.com/namsral/flag"
"go.ligato.io/cn-infra/v2/agent"
"go.ligato.io/cn-infra/v2/infra"
"golang.org/x/net/context"
"google.golang.org/grpc"

"go.ligato.io/vpp-agent/v3/proto/ligato/configurator"
Expand All @@ -31,7 +35,9 @@ import (
var (
address = flag.String("address", "localhost:9111", "address of GRPC server")
socketType = flag.String("socket-type", "tcp", "[tcp, tcp4, tcp6, unix, unixpacket]")
period = flag.Uint("period", 3, "Polling period (in seconds)")

period = flag.Uint("period", 3, "Polling period (in seconds)")
polls = flag.Uint("polls", 0, "Number of pollings")
)

func main() {
Expand Down Expand Up @@ -59,7 +65,11 @@ func (p *ExamplePlugin) Init() (err error) {
// Set up connection to the server.
p.conn, err = grpc.Dial("unix",
grpc.WithInsecure(),
grpc.WithDialer(dialer(*socketType, *address, time.Second*3)))
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return net.DialTimeout(*socketType, *address, time.Second*3)
}),
//grpc.WithContextDialer(dialer(*socketType, *address, time.Second*3)),
)

if err != nil {
return err
Expand All @@ -75,13 +85,14 @@ func (p *ExamplePlugin) Init() (err error) {

// Get is an implementation of client-side statistics streaming.
func (p *ExamplePlugin) pollStats(client configurator.StatsPollerServiceClient) {
p.Log.Infof("Polling every %v seconds..", *period)
ctx := context.Background()

req := &configurator.PollStatsRequest{
PeriodSec: uint32(*period),
NumPolls: uint32(*polls),
}
fmt.Printf("Polling stats: %v\n", req)

ctx := context.Background()
stream, err := client.PollStats(ctx, req)
if err != nil {
p.Log.Fatalln("PollStats failed:", err)
Expand All @@ -90,26 +101,26 @@ func (p *ExamplePlugin) pollStats(client configurator.StatsPollerServiceClient)
var lastSeq uint32
for {
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
p.Log.Infof("Polling has completed.")
os.Exit(0)
} else if err != nil {
p.Log.Fatalln("Recv failed:", err)
}

if resp.PollSeq != lastSeq {
p.Log.Infof(" --- Poll sequence: %-3v", resp.PollSeq)
fmt.Printf(" --- Poll sequence: %-3v\n", resp.PollSeq)
}
lastSeq = resp.PollSeq

vppStats := resp.GetStats().GetVppStats()
p.Log.Infof("VPP stats: %v", vppStats)
fmt.Printf("VPP stats: %v\n", vppStats)
}
}

// Dialer for unix domain socket
func dialer(socket, address string, timeoutVal time.Duration) func(string, time.Duration) (net.Conn, error) {
return func(addr string, timeout time.Duration) (net.Conn, error) {
// Pass values
addr, timeout = address, timeoutVal
// Dial with timeout
return net.DialTimeout(socket, addr, timeoutVal)
func dialer(socket, address string, timeoutVal time.Duration) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
return net.DialTimeout(socket, address, timeoutVal)
}
}
71 changes: 48 additions & 23 deletions plugins/telemetry/stats_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

govppapi "git.fd.io/govpp.git/api"
"github.com/golang/protobuf/proto"
"go.ligato.io/cn-infra/v2/logging"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -25,35 +26,26 @@ type statsPollerServer struct {
}

func (s *statsPollerServer) PollStats(req *configurator.PollStatsRequest, svr configurator.StatsPollerService_PollStatsServer) error {
var pollSeq uint32

if req.PeriodSec == 0 {
return status.Error(codes.InvalidArgument, "PeriodSec must be greater than 0")
if req.GetPeriodSec() == 0 && req.GetNumPolls() > 1 {
return status.Error(codes.InvalidArgument, "period must be > 0 if number of polls is > 1")
}
period := time.Duration(req.PeriodSec) * time.Second

tick := time.NewTicker(period)
defer tick.Stop()

s.log.Debugf("starting to poll stats every %v", period)
for {
pollSeq++
s.log.WithField("seq", pollSeq).Debugf("polling stats..")
ctx := svr.Context()

vppStatsCh := make(chan vpp.Stats)
var err error
streamStats := func(pollSeq uint32) (err error) {
vppStatsCh := make(chan *vpp.Stats)
go func() {
err = s.streamVppStats(vppStatsCh)
err = s.streamVppStats(ctx, vppStatsCh)
close(vppStatsCh)
}()
for vppStats := range vppStatsCh {
VppStats := vppStats
VppStats := proto.Clone(vppStats).(*vpp.Stats)
s.log.Debugf("sending vpp stats: %v", VppStats)

if err := svr.Send(&configurator.PollStatsResponse{
PollSeq: pollSeq,
Stats: &configurator.Stats{
Stats: &configurator.Stats_VppStats{VppStats: &VppStats},
Stats: &configurator.Stats_VppStats{VppStats: VppStats},
},
}); err != nil {
s.log.Errorf("sending stats failed: %v", err)
Expand All @@ -64,14 +56,41 @@ func (s *statsPollerServer) PollStats(req *configurator.PollStatsRequest, svr co
s.log.Errorf("polling vpp stats failed: %v", err)
return err
}
return nil
}

<-tick.C
if req.GetPeriodSec() == 0 {
return streamStats(0)
}
}

func (s *statsPollerServer) streamVppStats(ch chan vpp.Stats) error {
ctx := context.Background()
period := time.Duration(req.GetPeriodSec()) * time.Second
s.log.Debugf("start polling stats every %v", period)

tick := time.NewTicker(period)
defer tick.Stop()

for pollSeq := uint32(1); ; pollSeq++ {
s.log.WithField("seq", pollSeq).Debugf("polling stats..")

if err := streamStats(pollSeq); err != nil {
return err
}

if req.GetNumPolls() > 0 && pollSeq >= req.GetNumPolls() {
s.log.Debugf("reached %d pollings", req.GetNumPolls())
return nil
}

select {
case <-tick.C:
// period passed
case <-ctx.Done():
return ctx.Err()
}
}
}

func (s *statsPollerServer) streamVppStats(ctx context.Context, ch chan *vpp.Stats) error {
ifStats, err := s.handler.GetInterfaceStats(ctx)
if err != nil {
return err
Expand All @@ -87,7 +106,7 @@ func (s *statsPollerServer) streamVppStats(ch chan vpp.Stats) error {
// fallback to internal name
name = iface.InterfaceName
}
vppStats := vpp.Stats{
vppStats := &vpp.Stats{
Interface: &vpp_interfaces.InterfaceStats{
Name: name,
Rx: convertInterfaceCombined(iface.Rx),
Expand All @@ -109,7 +128,13 @@ func (s *statsPollerServer) streamVppStats(ch chan vpp.Stats) error {
Mpls: iface.Mpls,
},
}
ch <- vppStats

select {
case ch <- vppStats:
// stats sent
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
Expand Down
63 changes: 39 additions & 24 deletions proto/ligato/configurator/statspoller.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions proto/ligato/configurator/statspoller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,32 @@ option go_package = "go.ligato.io/vpp-agent/v3/proto/ligato/configurator;configu

import "ligato/vpp/vpp.proto";

// Stats defines stats data returned by StatsPollerService.
message Stats {
oneof stats {
vpp.Stats vpp_stats = 1;
}
}

message PollStatsRequest {
// PeriodSec defines polling period (in seconds)
// PeriodSec defines polling period (in seconds). Set to zero to
// return just single polling.
uint32 period_sec = 1;
// NumPolls defines number of pollings. Set to non-zero number to
// stop the polling after specified number of pollings is reached.
uint32 num_polls = 2;
}


message PollStatsResponse {
// PollSeq defines the sequence number of this polling response.
uint32 poll_seq = 1;
// Stats contains polled stats data.
Stats stats = 2;
}

// StatsPollerService provides operations for collecting statistics.
service StatsPollerService {
// PollStats is used for polling metrics using poll period.
// PollStats is used for polling stats with specific period and number of pollings.
rpc PollStats(PollStatsRequest) returns (stream PollStatsResponse) {};
}
Loading

0 comments on commit e88f3bd

Please sign in to comment.