Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(telemetry): Allow customizing polling from stats poller #1634

Merged
merged 2 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 25 additions & 14 deletions examples/grpc_vpp/stats_poller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package main

import (
"context"
"errors"
"fmt"
"io"
"log"
"net"
"os"
"time"

"github.com/namsral/flag"
"go.ligato.io/cn-infra/v2/agent"
"go.ligato.io/cn-infra/v2/infra"
"golang.org/x/net/context"
"google.golang.org/grpc"

"go.ligato.io/vpp-agent/v3/proto/ligato/configurator"
Expand All @@ -31,7 +35,9 @@ import (
var (
address = flag.String("address", "localhost:9111", "address of GRPC server")
socketType = flag.String("socket-type", "tcp", "[tcp, tcp4, tcp6, unix, unixpacket]")
period = flag.Uint("period", 3, "Polling period (in seconds)")

period = flag.Uint("period", 3, "Polling period (in seconds)")
polls = flag.Uint("polls", 0, "Number of pollings")
)

func main() {
Expand Down Expand Up @@ -59,7 +65,11 @@ func (p *ExamplePlugin) Init() (err error) {
// Set up connection to the server.
p.conn, err = grpc.Dial("unix",
grpc.WithInsecure(),
grpc.WithDialer(dialer(*socketType, *address, time.Second*3)))
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return net.DialTimeout(*socketType, *address, time.Second*3)
}),
//grpc.WithContextDialer(dialer(*socketType, *address, time.Second*3)),
)

if err != nil {
return err
Expand All @@ -75,13 +85,14 @@ func (p *ExamplePlugin) Init() (err error) {

// Get is an implementation of client-side statistics streaming.
func (p *ExamplePlugin) pollStats(client configurator.StatsPollerServiceClient) {
p.Log.Infof("Polling every %v seconds..", *period)
ctx := context.Background()

req := &configurator.PollStatsRequest{
PeriodSec: uint32(*period),
NumPolls: uint32(*polls),
}
fmt.Printf("Polling stats: %v\n", req)

ctx := context.Background()
stream, err := client.PollStats(ctx, req)
if err != nil {
p.Log.Fatalln("PollStats failed:", err)
Expand All @@ -90,26 +101,26 @@ func (p *ExamplePlugin) pollStats(client configurator.StatsPollerServiceClient)
var lastSeq uint32
for {
resp, err := stream.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
p.Log.Infof("Polling has completed.")
os.Exit(0)
} else if err != nil {
p.Log.Fatalln("Recv failed:", err)
}

if resp.PollSeq != lastSeq {
p.Log.Infof(" --- Poll sequence: %-3v", resp.PollSeq)
fmt.Printf(" --- Poll sequence: %-3v\n", resp.PollSeq)
}
lastSeq = resp.PollSeq

vppStats := resp.GetStats().GetVppStats()
p.Log.Infof("VPP stats: %v", vppStats)
fmt.Printf("VPP stats: %v\n", vppStats)
}
}

// Dialer for unix domain socket
func dialer(socket, address string, timeoutVal time.Duration) func(string, time.Duration) (net.Conn, error) {
return func(addr string, timeout time.Duration) (net.Conn, error) {
// Pass values
addr, timeout = address, timeoutVal
// Dial with timeout
return net.DialTimeout(socket, addr, timeoutVal)
func dialer(socket, address string, timeoutVal time.Duration) func(context.Context, string) (net.Conn, error) {
return func(ctx context.Context, addr string) (net.Conn, error) {
return net.DialTimeout(socket, address, timeoutVal)
}
}
71 changes: 48 additions & 23 deletions plugins/telemetry/stats_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

govppapi "git.fd.io/govpp.git/api"
"github.com/golang/protobuf/proto"
"go.ligato.io/cn-infra/v2/logging"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -25,35 +26,26 @@ type statsPollerServer struct {
}

func (s *statsPollerServer) PollStats(req *configurator.PollStatsRequest, svr configurator.StatsPollerService_PollStatsServer) error {
var pollSeq uint32

if req.PeriodSec == 0 {
return status.Error(codes.InvalidArgument, "PeriodSec must be greater than 0")
if req.GetPeriodSec() == 0 && req.GetNumPolls() > 1 {
return status.Error(codes.InvalidArgument, "period must be > 0 if number of polls is > 1")
}
period := time.Duration(req.PeriodSec) * time.Second

tick := time.NewTicker(period)
defer tick.Stop()

s.log.Debugf("starting to poll stats every %v", period)
for {
pollSeq++
s.log.WithField("seq", pollSeq).Debugf("polling stats..")
ctx := svr.Context()

vppStatsCh := make(chan vpp.Stats)
var err error
streamStats := func(pollSeq uint32) (err error) {
vppStatsCh := make(chan *vpp.Stats)
go func() {
err = s.streamVppStats(vppStatsCh)
err = s.streamVppStats(ctx, vppStatsCh)
close(vppStatsCh)
}()
for vppStats := range vppStatsCh {
VppStats := vppStats
VppStats := proto.Clone(vppStats).(*vpp.Stats)
s.log.Debugf("sending vpp stats: %v", VppStats)

if err := svr.Send(&configurator.PollStatsResponse{
PollSeq: pollSeq,
Stats: &configurator.Stats{
Stats: &configurator.Stats_VppStats{VppStats: &VppStats},
Stats: &configurator.Stats_VppStats{VppStats: VppStats},
},
}); err != nil {
s.log.Errorf("sending stats failed: %v", err)
Expand All @@ -64,14 +56,41 @@ func (s *statsPollerServer) PollStats(req *configurator.PollStatsRequest, svr co
s.log.Errorf("polling vpp stats failed: %v", err)
return err
}
return nil
}

<-tick.C
if req.GetPeriodSec() == 0 {
return streamStats(0)
}
}

func (s *statsPollerServer) streamVppStats(ch chan vpp.Stats) error {
ctx := context.Background()
period := time.Duration(req.GetPeriodSec()) * time.Second
s.log.Debugf("start polling stats every %v", period)

tick := time.NewTicker(period)
defer tick.Stop()

for pollSeq := uint32(1); ; pollSeq++ {
s.log.WithField("seq", pollSeq).Debugf("polling stats..")

if err := streamStats(pollSeq); err != nil {
return err
}

if req.GetNumPolls() > 0 && pollSeq >= req.GetNumPolls() {
s.log.Debugf("reached %d pollings", req.GetNumPolls())
return nil
}

select {
case <-tick.C:
// period passed
case <-ctx.Done():
return ctx.Err()
}
}
}

func (s *statsPollerServer) streamVppStats(ctx context.Context, ch chan *vpp.Stats) error {
ifStats, err := s.handler.GetInterfaceStats(ctx)
if err != nil {
return err
Expand All @@ -87,7 +106,7 @@ func (s *statsPollerServer) streamVppStats(ch chan vpp.Stats) error {
// fallback to internal name
name = iface.InterfaceName
}
vppStats := vpp.Stats{
vppStats := &vpp.Stats{
Interface: &vpp_interfaces.InterfaceStats{
Name: name,
Rx: convertInterfaceCombined(iface.Rx),
Expand All @@ -109,7 +128,13 @@ func (s *statsPollerServer) streamVppStats(ch chan vpp.Stats) error {
Mpls: iface.Mpls,
},
}
ch <- vppStats

select {
case ch <- vppStats:
// stats sent
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
Expand Down
63 changes: 39 additions & 24 deletions proto/ligato/configurator/statspoller.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 9 additions & 2 deletions proto/ligato/configurator/statspoller.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,32 @@ option go_package = "go.ligato.io/vpp-agent/v3/proto/ligato/configurator;configu

import "ligato/vpp/vpp.proto";

// Stats defines stats data returned by StatsPollerService.
message Stats {
oneof stats {
vpp.Stats vpp_stats = 1;
}
}

message PollStatsRequest {
// PeriodSec defines polling period (in seconds)
// PeriodSec defines polling period (in seconds). Set to zero to
// return just single polling.
uint32 period_sec = 1;
// NumPolls defines number of pollings. Set to non-zero number to
// stop the polling after specified number of pollings is reached.
uint32 num_polls = 2;
}


message PollStatsResponse {
// PollSeq defines the sequence number of this polling response.
uint32 poll_seq = 1;
// Stats contains polled stats data.
Stats stats = 2;
}

// StatsPollerService provides operations for collecting statistics.
service StatsPollerService {
// PollStats is used for polling metrics using poll period.
// PollStats is used for polling stats with specific period and number of pollings.
rpc PollStats(PollStatsRequest) returns (stream PollStatsResponse) {};
}
Loading