diff --git a/CHANGELOG.md b/CHANGELOG.md index d67fce335..b264f38d2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,11 +1,16 @@ +# 14.1.0, (WIP) + +## Added +* The ability to emit dogstatsd metrics from veneur-emit over plain TCP in addition to the current plain UDP. Thanks, [shrivu-stripe](https://github.com/shrivu-stripe)! +* A config option and health checking for beginning support of emitting/receiving metrics via gRPC. Thanks, [eriwo-stripe](https://github.com/eriwo-stripe)! +* A gRPC server that listens for SSF spans and dogstatsd metrics on grpc_listening_addresses. Thanks [eriwo-stripe](https://github.com/eriwo-stripe) and [shrivu-stripe](https://github.com/shrivu-stripe)! +* The ability to emit metrics from veneur-emit via the gRPC protocol as well as the option to specify a proxy for those metrics. Thanks [eriwo-stripe](https://github.com/eriwo-stripe) and [shrivu-stripe](https://github.com/shrivu-stripe)! + # 14.0.0, 2020-01-14 ## Updated * Migrated from dep to Go modules. Clients must now use the updated import path `github.com/stripe/veneur/v14`. Thanks, [andybons](https://github.com/andybons)! -## Added -* A config option and health checking for beginning support of emitting/receiving metrics via gRPC. Thanks, [eriwo-stripe](https://github.com/eriwo-stripe)! - # 13.0.0, 2020-01-05 ## Added diff --git a/cmd/veneur-emit/README.md b/cmd/veneur-emit/README.md index 320cb548f..7f2378c1f 100644 --- a/cmd/veneur-emit/README.md +++ b/cmd/veneur-emit/README.md @@ -42,20 +42,24 @@ Usage of veneur-emit: Add timestamp to the event. Default is the current Unix epoch timestamp. -e_title string Title of event. Ex: 'An exception occurred' * + -error + Mark the reported span as having errored -gauge float Report a 'gauge' metric. Value must be float64. + -grpc + Sets the emitting protocal to gRPC. This cannot be used with "udp" hostports. -hostport string Address of destination (hostport or listening address URL). -indicator Mark the reported span as an indicator span - -error - Mark the reported span as having errored -mode string Mode for veneur-emit. Must be one of: 'metric', 'event', 'sc'. (default "metric") -name string Name of metric to report. Ex: 'daemontools.service.starts' -parent_span_id int ID of the parent span. + -proxy string + Address or URL to be used for proxying gRPC requests. The request will be sent to the proxy with the "hostport" attached so the proxy can forward the request. This can only be used with the "-grpc" flag. -sc_hostname string Add hostname to the event. -sc_msg string @@ -94,6 +98,59 @@ instance. ## Dogstatsd mode +### Dogstatsd Protocols + +We currently support **TCP** `(tcp://)`, **UDP** `(udp://)`, **Unix Socket** `(unix://)`, and **gRPC** `(tcp://)` protocols for emitting in dogstatsd. + +#### UDP protocol: + +We support the UDP protocol as part of the `-hostport` argument. This is the default for hostports that don't provide a protocol. + +**Example command:** + +```sh +veneur-emit -hostport udp://127.0.0.1:8200 -name ... +``` + +**Example command without specified protocol (uses UDP default):** +```sh +veneur-emit -hostport 127.0.0.1:8200 -name ... +``` + +#### TCP protocol: + +We support the TCP protocol as part of the `-hostport` argument. + +**Example command:** + +```sh +veneur-emit -hostport tcp://127.0.0.1:8200 -name ... +``` + +#### Unix Socket protocol: + +We support Unix Sockets as part of the `-hostport` argument. + +**Example command:** + +```sh +veneur-emit -hostport unix:///var/run/veneur/dogstats.sock -name ... +``` + +#### gRPC protocol: + +We support the gRPC protocol through the `-grpc` flag. When this flag is provided, the `-hostport` argument is treated as the URI of a Veneur instance serving gRPC requests. + +**Example command:** + +```sh +veneur-emit -hostport tcp://127.0.0.1:8200 -grpc -name ... +``` + +### Dogstatsd Examples + +All of the below commands can be used with the `-grpc` flag to send them over gRPC. + Increment a counter in dogstatsd mode: ```sh @@ -128,7 +185,48 @@ veneur-emit -hostport udp://127.0.0.1:8200 -name some.set.metric -set customer_a In SSF mode, veneur-emit will construct and submit an SSF span with optional metrics. SSF mode does not yet support events or service -checks. +checks. SSF mode is not the default and needs to be enabled by providing the `-ssf` flag. + +### SSF Protocols + +Currently we support **UDP** `(udp://)`, **Unix Socket** `(unix://)`, and **gRPC** `(tcp://)` protocols for emitting in SSF. + +### UDP protocol: + +We support the UDP protocol as part of the `-hostport` argument. This is the default for hostports that don't provide a protocol. + +**Example command:** + +```sh +veneur-emit -ssf -hostport udp://127.0.0.1:8200 -name ... +``` + +**Example command without specified protocol (uses UDP default):** +```sh +veneur-emit -ssf -hostport 127.0.0.1:8200 -name ... +``` + +#### Unix Socket protocol: + +We support Unix Sockets as part of the `-hostport` argument. + +**Example command:** + +```sh +veneur-emit -ssf -hostport unix:///var/run/veneur/ssf.sock -name .... +``` + +#### gRPC protocol: + +We support the gRPC protocol through the `-grpc` flag. When this flag is provided, we treat the `-hostport` argument as a TCP address or URL for the underlying gRPC connection. + +**Example command:** + +```sh +veneur-emit -ssf -hostport tcp://127.0.0.1:8200 -grpc -name ... +``` + +### SSF Examples Increment a counter in SSF mode: @@ -157,3 +255,23 @@ were passed in: ```sh veneur-emit -ssf -hostport unix:///var/run/veneur/ssf.sock -name some.command.timer -command not_a_real_command ``` + +## gRPC + +[gRPC](https://grpc.io/) is protocol for making remote service calls. In Veneur we use this to support emitting/receiving metrics over HTTP. +More details about our support of gRPC can be found below: + +### Supported Modes + +We currently support all emitting modes over gRPC. This means dogstatsd (metrics, events, service checks) and ssf spans. +To enable gRPC, just add the `-grpc` flag to a command and it will send with the gRPC protocol. +NOTE that gRPC uses TCP as its underlying protocol which makes it incompatible with UDP addresses. + +### Specifying a Proxy + +When using gRPC, you can also specify a proxy address/URL with the `-proxy` argument. +When you provide a proxy address, Veneur will send the request to the proxy and attach the `-hostport` argument so the proxy can forward the request. +The [go http proxy](https://golang.org/pkg/vendor/golang.org/x/net/http/httpproxy/) is an example of a supported proxy. +Unix Sockets, TCP addresses, and URLs are supported values for the proxy arguments. + + diff --git a/cmd/veneur-emit/main.go b/cmd/veneur-emit/main.go index 0275d3972..4a592170f 100644 --- a/cmd/veneur-emit/main.go +++ b/cmd/veneur-emit/main.go @@ -13,6 +13,8 @@ import ( "syscall" "time" + "golang.org/x/net/context" + "fmt" "strconv" @@ -22,8 +24,10 @@ import ( "github.com/araddon/dateparse" "github.com/sirupsen/logrus" "github.com/stripe/veneur/v14/protocol" + "github.com/stripe/veneur/v14/protocol/dogstatsd" "github.com/stripe/veneur/v14/ssf" "github.com/stripe/veneur/v14/trace" + "google.golang.org/grpc" ) type EmitMode uint @@ -33,6 +37,7 @@ type Flags struct { Mode string Debug bool Command bool + Proxy string ExtraArgs []string Name string @@ -42,6 +47,7 @@ type Flags struct { Set string Tag string ToSSF bool + ToGrpc bool Event struct { Title string @@ -176,49 +182,96 @@ func Main(args []string) int { validateFlagCombinations(passedFlags, flagStruct.ExtraArgs) - addr, netAddr, err := destination(flagStruct.HostPort, flagStruct.ToSSF) + addr, netAddr, err := destination(flagStruct.HostPort, flagStruct.ToGrpc) + proxy, proxyAddr, proxyErr := destination(flagStruct.Proxy, false) if err != nil { - logrus.WithError(err).Error("Error getting destination address.") + logrus.WithError(err).Error("Error encountered while resolving destination address.") return 1 } - logrus.WithField("net", netAddr.Network()). - WithField("addr", netAddr.String()). - WithField("ssf", flagStruct.ToSSF). + + if flagStruct.Proxy != "" && proxyErr != nil { + logrus.WithError(err).Error("Error encountered while resolving proxy destination address.") + return 1 + } + + logrus.WithField("addr", addr). + WithField("net", netAddr.Network()). + WithField("destination", flagStruct.HostPort). + WithField("grpc", flagStruct.ToGrpc). Debugf("destination") - if flagStruct.Mode == "event" { + if flagStruct.Proxy != "" { + logrus.WithField("addr", proxy). + WithField("net", proxyAddr.Network()). + WithField("proxy", flagStruct.Proxy). + Debugf("proxy") + } + + // We do "special" emitting for events and service checks. + // If we are doing gRPC then we just use the datadogGrpcWriter to send them as normal bytes + // If not, we send them as basic bytes over the network connection without making an intermediary ssf span + if flagStruct.Mode == "event" || flagStruct.Mode == "sc" { if flagStruct.ToSSF { logrus.WithField("mode", flagStruct.Mode). Error("Unsupported mode with SSF") return 1 } - logrus.Debug("Sending event") - nconn, _ := net.Dial(netAddr.Network(), netAddr.String()) - pkt, err := buildEventPacket(passedFlags) + + //We are going to be using the "mode" field a lot in order to differentiate between service checks and events + logrus.WithField("mode", flagStruct.Mode). + Debug("Building packet") + + var pkt bytes.Buffer + var err error + if flagStruct.Mode == "event" { + pkt, err = buildEventPacket(passedFlags) + } else { + pkt, err = buildSCPacket(passedFlags) + } if err != nil { - logrus.WithError(err).Error("build event") + logrus.WithField("mode", flagStruct.Mode). + WithError(err). + Error("Error encountered while building packet") return 1 } - nconn.Write(pkt.Bytes()) - logrus.Debugf("Buffer string: %s", pkt.String()) - return 0 - } + logrus.Debugf("Packet Buffer string: %s", pkt.String()) - if flagStruct.Mode == "sc" { - if flagStruct.ToSSF { + if flagStruct.ToGrpc { logrus.WithField("mode", flagStruct.Mode). - Error("Unsupported mode with SSF") - return 1 - } - logrus.Debug("Sending service check") - nconn, _ := net.Dial(netAddr.Network(), netAddr.String()) - pkt, err := buildSCPacket(passedFlags) - if err != nil { - logrus.WithError(err).Error("build event") - return 1 + Debug("Sending packet via gRPC") + + writer, err := newDatadogGrpcWriter(netAddr, flagStruct.HostPort, proxyAddr) + if err != nil { + logrus.WithField("mode", flagStruct.Mode). + WithError(err). + Error("Error encountered while initializing grpcWriter") + return 1 + } + defer writer.Close() + _, err = writer.Write(pkt.Bytes()) + if err != nil { + logrus.WithField("mode", flagStruct.Mode). + WithError(err). + Error("Error encountered while writing to grpcWriter") + return 1 + } + } else { + logrus.WithField("mode", flagStruct.Mode). + Debug("Sending packet") + + nconn, _ := net.Dial(netAddr.Network(), netAddr.String()) + defer nconn.Close() + _, err = nconn.Write(pkt.Bytes()) + if err != nil { + logrus.WithField("mode", flagStruct.Mode). + WithError(err). + Error("Error encountered while sending packet") + return 1 + } } - nconn.Write(pkt.Bytes()) - logrus.Debugf("Buffer string: %s", pkt.String()) + + logrus.WithField("mode", flagStruct.Mode). + Debug("Packet sent") return 0 } @@ -256,37 +309,63 @@ func Main(args []string) int { status, err := createMetric(span, passedFlags, flagStruct.Name, flagStruct.Tag, flagStruct.Command, flagStruct.ExtraArgs) if err != nil { - logrus.WithError(err).Error("Error creating metrics.") + logrus.WithError(err).Error("Error encountered while creating metrics.") return 1 } if flagStruct.ToSSF { - client, err := trace.NewClient(addr) - if err != nil { - logrus.WithError(err). - WithField("address", addr). - Error("Could not construct client") - return 1 - } - defer client.Close() - err = sendSSF(client, span) - if err != nil { - logrus.WithError(err).Error("Could not send SSF span") - return 1 + dialAddr := netAddr.String() + if flagStruct.ToGrpc { // SSF via gRPC + grpcDialOptions := []grpc.DialOption{grpc.WithBlock(), grpc.WithInsecure()} + if proxyAddr != nil { + grpcDialOptions = append(grpcDialOptions, grpc.WithDialer(func(_ string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout(proxyAddr.Network(), proxyAddr.String(), timeout) + })) + // If we are proxying then we want to pass the unresolved hostport to avoid SSL errors + dialAddr = flagStruct.HostPort + } + logrus.WithField("proxying", proxyAddr != nil). + Debugf("Sending SSF metrics via gRPC") + conn, err := grpc.Dial(dialAddr, grpcDialOptions...) + if err != nil { + logrus.WithError(err).Error("Error encountered while dialing grpc ssf server") + return 1 + } + defer conn.Close() + client := ssf.NewSSFGRPCClient(conn) + _, err = client.SendSpan(context.Background(), span) + if err != nil { + logrus.WithError(err).Error("Error encountered while sending ssf span over grpc") + return 1 + } + } else { // SSF via UDP + client, err := trace.NewClient(addr) + if err != nil { + logrus.WithError(err). + WithField("address", addr). + Error("Error encountered while constructing client") + return 1 + } + defer client.Close() + err = sendSSF(client, span) + if err != nil { + logrus.WithError(err).Error("Error encountered while sending SSF span") + return 1 + } } - } else { - if netAddr.Network() != "udp" { + } else { // This is the dogstatsd code block + if netAddr.Network() != "udp" && netAddr.Network() != "tcp" { logrus.WithField("address", addr). WithField("network", netAddr.Network()). - Error("hostport must be a UDP address for statsd metrics") + Error("hostport must be a UDP or TCP address for statsd metrics") return 1 } if len(span.Metrics) == 0 { logrus.Error("No metrics to send. Must pass metric data via at least one of -count, -gauge, -timing, or -set.") return 1 } - err = sendStatsd(netAddr.String(), span) + err = sendStatsd(netAddr, flagStruct.HostPort, span, flagStruct.ToGrpc, proxyAddr) if err != nil { - logrus.WithError(err).Error("Could not send UDP metrics") + logrus.WithError(err).Error("Error encountered while sending metrics") return 1 } } @@ -302,6 +381,7 @@ func flags(args []string) (Flags, map[string]flag.Value, error) { flagset.StringVar(&flagStruct.Mode, "mode", "metric", "Mode for veneur-emit. Must be one of: 'metric', 'event', 'sc'.") flagset.BoolVar(&flagStruct.Debug, "debug", false, "Turns on debug messages.") flagset.BoolVar(&flagStruct.Command, "command", false, "Turns on command-timing mode. veneur-emit will grab everything after the first non-known-flag argument, time its execution, and report it as a timing metric.") + flagset.StringVar(&flagStruct.Proxy, "proxy", "", "Uses the argument (in hostport format) to proxy your emit. (Sets the authority header if using HTTP. This is the equivalent of Host for HTTP/2)") // Metric flags flagset.StringVar(&flagStruct.Name, "name", "", "Name of metric to report. Ex: 'daemontools.service.starts'") @@ -311,6 +391,7 @@ func flags(args []string) (Flags, map[string]flag.Value, error) { flagset.StringVar(&flagStruct.Set, "set", "", "Report a 'set' metric with an arbitrary string value.") flagset.StringVar(&flagStruct.Tag, "tag", "", "Tag(s) for metric, comma separated. Ex: 'service:airflow'. Note: Any tags here are applied to all emitted data. See also mode-specific tag options (e.g. span_tags)") flagset.BoolVar(&flagStruct.ToSSF, "ssf", false, "Sends packets via SSF instead of StatsD. (https://github.com/stripe/veneur/blob/master/ssf/)") + flagset.BoolVar(&flagStruct.ToGrpc, "grpc", false, "Send the metric over grpc (SSF format)") // Event flags // TODO: what should flags be called? @@ -380,15 +461,28 @@ func tagsFromString(csv string) map[string]string { return tags } -func destination(hostport string, useSSF bool) (string, net.Addr, error) { +func destination(hostport string, isGrpc bool) (string, net.Addr, error) { + defaultScheme := "udp" + if isGrpc { + defaultScheme = "tcp" + } if hostport == "" { return "", nil, errors.New("you must specify a valid hostport") } + + hostport, addr, err := resolveHostport(hostport, defaultScheme) + if err != nil { + return "", nil, err + } + return hostport, addr, nil +} + +func resolveHostport(hostport string, defaultScheme string) (string, net.Addr, error) { netAddr, err := protocol.ResolveAddr(hostport) if err != nil { // This is fine - we can attempt to treat the // host:port combination as a UDP address: - hostport = fmt.Sprintf("udp://%s", hostport) + hostport := fmt.Sprintf("%s://%s", defaultScheme, hostport) udpAddr, err := protocol.ResolveAddr(hostport) if err != nil { return "", nil, err @@ -574,10 +668,108 @@ func sendSSF(client *trace.Client, span *ssf.SSFSpan) error { return <-done } +// newDatadogTCPWriter is adapted from https://github.com/DataDog/datadog-go/blob/master/statsd/udp.go +func newDatadogTCPWriter(addr string) (*datadogTCPWriter, error) { + tcpAddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return nil, err + } + conn, err := net.DialTCP("tcp", nil, tcpAddr) + if err != nil { + return nil, err + } + writer := &datadogTCPWriter{conn: conn} + return writer, nil +} + +type datadogTCPWriter struct { + conn net.Conn +} + +func (w *datadogTCPWriter) Write(data []byte) (n int, err error) { + return w.conn.Write(data) +} + +func (w *datadogTCPWriter) SetWriteTimeout(timeout time.Duration) error { + // This is unused in the current implementation + return nil +} + +func (w *datadogTCPWriter) Close() error { + return w.conn.Close() +} + +// newDatadogTCPWriter is adapted from https://github.com/DataDog/datadog-go/blob/master/statsd/udp.go +func newDatadogGrpcWriter(netAddr net.Addr, addr string, proxyAddr net.Addr) (*datadogGrpcWriter, error) { + dialAddr := netAddr.String() + grpcDialOptions := []grpc.DialOption{grpc.WithBlock(), grpc.WithInsecure()} + if proxyAddr != nil { + grpcDialOptions = append(grpcDialOptions, grpc.WithDialer(func(_ string, timeout time.Duration) (net.Conn, error) { + return net.DialTimeout(proxyAddr.Network(), proxyAddr.String(), timeout) + })) + + // If we are proxying then we want to pass the unresolved hostport to avoid SSL errors + dialAddr = addr + } + conn, err := grpc.Dial(dialAddr, grpcDialOptions...) + if err != nil { + logrus.WithError(err).Error("Could not dial grpc dogstatsd server") + return nil, err + } + client := dogstatsd.NewDogstatsdGRPCClient(conn) + writer := &datadogGrpcWriter{client: client, conn: conn} + return writer, nil +} + +type datadogGrpcWriter struct { + client dogstatsd.DogstatsdGRPCClient + conn *grpc.ClientConn +} + +func (w *datadogGrpcWriter) Write(data []byte) (n int, err error) { + metricPacket := &dogstatsd.DogstatsdPacket{} + metricPacket.PacketBytes = data + _, err = w.client.SendPacket(context.Background(), metricPacket) + if err != nil { + logrus.WithError(err).Error("Error encountered while sending dogstatsd over grpc") + return 0, err + } + return len(data), nil +} + +func (w *datadogGrpcWriter) SetWriteTimeout(timeout time.Duration) error { + // This is unused in the current implementation + return nil +} + +func (w *datadogGrpcWriter) Close() error { + return w.conn.Close() +} + // sendStatsd sends the metrics gathered in a span to a dogstatsd // endpoint. -func sendStatsd(addr string, span *ssf.SSFSpan) error { - client, err := statsd.New(addr) +func sendStatsd(netAddr net.Addr, addr string, span *ssf.SSFSpan, useGrpc bool, proxyAddr net.Addr) error { + var client *statsd.Client + var err error + if useGrpc { + writer, err := newDatadogGrpcWriter(netAddr, addr, proxyAddr) + if err == nil { + client, err = statsd.NewWithWriter(writer) + } + } else { + network := netAddr.Network() + switch network { + case "udp": + client, err = statsd.New(netAddr.String()) + case "tcp": + writer, err := newDatadogTCPWriter(netAddr.String()) + if err == nil { + client, err = statsd.NewWithWriter(writer) + } + default: + err = fmt.Errorf("%s is not supported for sending statsd metrics", network) + } + } if err != nil { return err } @@ -608,7 +800,9 @@ func sendStatsd(addr string, span *ssf.SSFSpan) error { return err } } - client.Flush() + //Using Close() instead of Flush() avoids dropping metrics and avoids a potential race condition as called out here: + //https://github.com/DataDog/datadog-go/pull/120 + client.Close() return nil } diff --git a/generate.go b/generate.go index 1aab5dce6..2e0e9b0e6 100644 --- a/generate.go +++ b/generate.go @@ -1,6 +1,8 @@ package veneur //go:generate protoc --gogofaster_out=Mssf/sample.proto=github.com/stripe/veneur/v14/ssf,plugins=grpc:. sinks/grpsink/grpc_sink.proto +//go:generate protoc --gogofaster_out=plugins=grpc:. ssf/grpc.proto +//go:generate protoc --gogofaster_out=plugins=grpc:. protocol/dogstatsd/grpc.proto //go:generate protoc --gogofaster_out=. ssf/sample.proto //go:generate protoc -I=. -I=$GOPATH/pkg/mod -I=$GOPATH/pkg/mod/github.com/gogo/protobuf@v1.2.1/protobuf --gogofaster_out=. tdigest/tdigest.proto //go:generate protoc -I=. -I=$GOPATH/pkg/mod -I=$GOPATH/pkg/mod/github.com/gogo/protobuf@v1.2.1/protobuf --gogofaster_out=Mtdigest/tdigest.proto=github.com/stripe/veneur/v14/tdigest:. samplers/metricpb/metric.proto diff --git a/networking.go b/networking.go index 1ec990282..d3fb305e7 100644 --- a/networking.go +++ b/networking.go @@ -1,6 +1,7 @@ package veneur import ( + "context" "crypto/tls" "fmt" "net" @@ -9,6 +10,8 @@ import ( "sync" "github.com/sirupsen/logrus" + "github.com/stripe/veneur/v14/protocol/dogstatsd" + "github.com/stripe/veneur/v14/ssf" flock "github.com/theckman/go-flock" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -303,6 +306,23 @@ func StartGRPC(s *Server, a net.Addr) net.Addr { return a } +type grpcStatsServer struct { + server *Server +} + +//This is the function that fulfils the ssf server proto +func (grpcsrv *grpcStatsServer) SendPacket(ctx context.Context, packet *dogstatsd.DogstatsdPacket) (*dogstatsd.Empty, error) { + //We use processMetricPacket instead of handleMetricPacket because process can split the byte array into multiple packets if needed + grpcsrv.server.processMetricPacket(len(packet.GetPacketBytes()), packet.GetPacketBytes(), nil) + return &dogstatsd.Empty{}, nil +} + +// This is the function that fulfils the dogstatsd server proto +func (grpcsrv *grpcStatsServer) SendSpan(ctx context.Context, span *ssf.SSFSpan) (*ssf.Empty, error) { + grpcsrv.server.handleSSF(span, "grpc") + return &ssf.Empty{}, nil +} + func startGRPCTCP(s *Server, addr *net.TCPAddr) (*grpc.Server, net.Addr) { listener, err := net.Listen("tcp", addr.String()) if err != nil { @@ -323,7 +343,13 @@ func startGRPCTCP(s *Server, addr *net.TCPAddr) (*grpc.Server, net.Addr) { } healthServer := health.NewServer() healthServer.SetServingStatus("veneur", grpc_health_v1.HealthCheckResponse_SERVING) + + statsServer := &grpcStatsServer{server: s} + grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) + ssf.RegisterSSFGRPCServer(grpcServer, statsServer) + dogstatsd.RegisterDogstatsdGRPCServer(grpcServer, statsServer) + log.WithFields(logrus.Fields{ "address": addr, "mode": mode, }).Info("Listening for metrics on GRPC socket") diff --git a/networking_test.go b/networking_test.go index 3c434af82..d902bd5ea 100644 --- a/networking_test.go +++ b/networking_test.go @@ -14,6 +14,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stripe/veneur/v14/protocol" + "github.com/stripe/veneur/v14/protocol/dogstatsd" + "github.com/stripe/veneur/v14/ssf" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" ) @@ -186,3 +188,75 @@ func TestHealthCheckGRPC(t *testing.T) { } grpcServer.Stop() } + +func TestConnectSSFGRPC(t *testing.T) { + srv := &Server{} + srv.SpanChan = make(chan *ssf.SSFSpan, 100) + + addrNet, err := protocol.ResolveAddr("tcp://127.0.0.1:8181") + require.NoError(t, err) + addr, ok := addrNet.(*net.TCPAddr) + require.True(t, ok) + grpcServer, _ := startGRPCTCP(srv, addr) + + conns := make(chan struct{}) + for i := 0; i < 5; i++ { + go func() { + conn, err := grpc.Dial(addr.String(), grpc.WithInsecure()) + defer conn.Close() + require.NoError(t, err) + client := ssf.NewSSFGRPCClient(conn) + _, err = client.SendSpan(context.Background(), &ssf.SSFSpan{}) + require.NoError(t, err) + conns <- struct{}{} + }() + } + + timeout := time.After(3 * time.Second) + for i := 0; i < 5; i++ { + select { + case <-timeout: + t.Fatalf("Timed out waiting for connection, %d made it", i) + case <-conns: + // pass + } + } + grpcServer.Stop() +} + +func TestConnectDogstatsdGRPC(t *testing.T) { + srv := &Server{} + srv.SpanChan = make(chan *ssf.SSFSpan, 100) + + addrNet, err := protocol.ResolveAddr("tcp://127.0.0.1:8181") + require.NoError(t, err) + addr, ok := addrNet.(*net.TCPAddr) + require.True(t, ok) + grpcServer, _ := startGRPCTCP(srv, addr) + + conns := make(chan struct{}) + for i := 0; i < 5; i++ { + go func() { + conn, err := grpc.Dial(addr.String(), grpc.WithInsecure()) + defer conn.Close() + require.NoError(t, err) + client := dogstatsd.NewDogstatsdGRPCClient(conn) + metricPacket := &dogstatsd.DogstatsdPacket{} + metricPacket.PacketBytes = nil + _, err = client.SendPacket(context.Background(), metricPacket) + require.NoError(t, err) + conns <- struct{}{} + }() + } + + timeout := time.After(3 * time.Second) + for i := 0; i < 5; i++ { + select { + case <-timeout: + t.Fatalf("Timed out waiting for connection, %d made it", i) + case <-conns: + // pass + } + } + grpcServer.Stop() +} diff --git a/protocol/dogstatsd/grpc.pb.go b/protocol/dogstatsd/grpc.pb.go new file mode 100644 index 000000000..91bfdd102 --- /dev/null +++ b/protocol/dogstatsd/grpc.pb.go @@ -0,0 +1,535 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: protocol/dogstatsd/grpc.proto + +package dogstatsd + +import ( + context "context" + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + io "io" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type Empty struct { +} + +func (m *Empty) Reset() { *m = Empty{} } +func (m *Empty) String() string { return proto.CompactTextString(m) } +func (*Empty) ProtoMessage() {} +func (*Empty) Descriptor() ([]byte, []int) { + return fileDescriptor_10e9369d26df60f3, []int{0} +} +func (m *Empty) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Empty) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Empty.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Empty) XXX_Merge(src proto.Message) { + xxx_messageInfo_Empty.Merge(m, src) +} +func (m *Empty) XXX_Size() int { + return m.Size() +} +func (m *Empty) XXX_DiscardUnknown() { + xxx_messageInfo_Empty.DiscardUnknown(m) +} + +var xxx_messageInfo_Empty proto.InternalMessageInfo + +type DogstatsdPacket struct { + PacketBytes []byte `protobuf:"bytes,1,opt,name=packetBytes,proto3" json:"packetBytes,omitempty"` +} + +func (m *DogstatsdPacket) Reset() { *m = DogstatsdPacket{} } +func (m *DogstatsdPacket) String() string { return proto.CompactTextString(m) } +func (*DogstatsdPacket) ProtoMessage() {} +func (*DogstatsdPacket) Descriptor() ([]byte, []int) { + return fileDescriptor_10e9369d26df60f3, []int{1} +} +func (m *DogstatsdPacket) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *DogstatsdPacket) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_DogstatsdPacket.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *DogstatsdPacket) XXX_Merge(src proto.Message) { + xxx_messageInfo_DogstatsdPacket.Merge(m, src) +} +func (m *DogstatsdPacket) XXX_Size() int { + return m.Size() +} +func (m *DogstatsdPacket) XXX_DiscardUnknown() { + xxx_messageInfo_DogstatsdPacket.DiscardUnknown(m) +} + +var xxx_messageInfo_DogstatsdPacket proto.InternalMessageInfo + +func (m *DogstatsdPacket) GetPacketBytes() []byte { + if m != nil { + return m.PacketBytes + } + return nil +} + +func init() { + proto.RegisterType((*Empty)(nil), "dogstatsd.Empty") + proto.RegisterType((*DogstatsdPacket)(nil), "dogstatsd.DogstatsdPacket") +} + +func init() { proto.RegisterFile("protocol/dogstatsd/grpc.proto", fileDescriptor_10e9369d26df60f3) } + +var fileDescriptor_10e9369d26df60f3 = []byte{ + // 164 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2d, 0x28, 0xca, 0x2f, + 0xc9, 0x4f, 0xce, 0xcf, 0xd1, 0x4f, 0xc9, 0x4f, 0x2f, 0x2e, 0x49, 0x2c, 0x29, 0x4e, 0xd1, 0x4f, + 0x2f, 0x2a, 0x48, 0xd6, 0x03, 0x8b, 0x0b, 0x71, 0xc2, 0x45, 0x95, 0xd8, 0xb9, 0x58, 0x5d, 0x73, + 0x0b, 0x4a, 0x2a, 0x95, 0x8c, 0xb9, 0xf8, 0x5d, 0x60, 0xa2, 0x01, 0x89, 0xc9, 0xd9, 0xa9, 0x25, + 0x42, 0x0a, 0x5c, 0xdc, 0x05, 0x60, 0x96, 0x53, 0x65, 0x49, 0x6a, 0xb1, 0x04, 0xa3, 0x02, 0xa3, + 0x06, 0x4f, 0x10, 0xb2, 0x90, 0x91, 0x37, 0x17, 0x2f, 0x5c, 0x93, 0x7b, 0x50, 0x80, 0xb3, 0x90, + 0x15, 0x17, 0x57, 0x70, 0x6a, 0x1e, 0xcc, 0x00, 0x29, 0x3d, 0xb8, 0x45, 0x7a, 0x68, 0x86, 0x4b, + 0x09, 0x20, 0xc9, 0x81, 0x5d, 0xe0, 0x24, 0x71, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, + 0x0f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x70, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, 0xc7, 0x72, + 0x0c, 0x49, 0x6c, 0x60, 0x67, 0x1b, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, 0xa0, 0x5f, 0x12, 0x15, + 0xd7, 0x00, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// DogstatsdGRPCClient is the client API for DogstatsdGRPC service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type DogstatsdGRPCClient interface { + SendPacket(ctx context.Context, in *DogstatsdPacket, opts ...grpc.CallOption) (*Empty, error) +} + +type dogstatsdGRPCClient struct { + cc *grpc.ClientConn +} + +func NewDogstatsdGRPCClient(cc *grpc.ClientConn) DogstatsdGRPCClient { + return &dogstatsdGRPCClient{cc} +} + +func (c *dogstatsdGRPCClient) SendPacket(ctx context.Context, in *DogstatsdPacket, opts ...grpc.CallOption) (*Empty, error) { + out := new(Empty) + err := c.cc.Invoke(ctx, "/dogstatsd.DogstatsdGRPC/SendPacket", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// DogstatsdGRPCServer is the server API for DogstatsdGRPC service. +type DogstatsdGRPCServer interface { + SendPacket(context.Context, *DogstatsdPacket) (*Empty, error) +} + +func RegisterDogstatsdGRPCServer(s *grpc.Server, srv DogstatsdGRPCServer) { + s.RegisterService(&_DogstatsdGRPC_serviceDesc, srv) +} + +func _DogstatsdGRPC_SendPacket_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DogstatsdPacket) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DogstatsdGRPCServer).SendPacket(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/dogstatsd.DogstatsdGRPC/SendPacket", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DogstatsdGRPCServer).SendPacket(ctx, req.(*DogstatsdPacket)) + } + return interceptor(ctx, in, info, handler) +} + +var _DogstatsdGRPC_serviceDesc = grpc.ServiceDesc{ + ServiceName: "dogstatsd.DogstatsdGRPC", + HandlerType: (*DogstatsdGRPCServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SendPacket", + Handler: _DogstatsdGRPC_SendPacket_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "protocol/dogstatsd/grpc.proto", +} + +func (m *Empty) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Empty) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func (m *DogstatsdPacket) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *DogstatsdPacket) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.PacketBytes) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintGrpc(dAtA, i, uint64(len(m.PacketBytes))) + i += copy(dAtA[i:], m.PacketBytes) + } + return i, nil +} + +func encodeVarintGrpc(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *Empty) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *DogstatsdPacket) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.PacketBytes) + if l > 0 { + n += 1 + l + sovGrpc(uint64(l)) + } + return n +} + +func sovGrpc(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozGrpc(x uint64) (n int) { + return sovGrpc(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Empty) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Empty: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Empty: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *DogstatsdPacket) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: DogstatsdPacket: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: DogstatsdPacket: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field PacketBytes", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthGrpc + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthGrpc + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.PacketBytes = append(m.PacketBytes[:0], dAtA[iNdEx:postIndex]...) + if m.PacketBytes == nil { + m.PacketBytes = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipGrpc(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthGrpc + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthGrpc + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipGrpc(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthGrpc + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthGrpc = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowGrpc = fmt.Errorf("proto: integer overflow") +) diff --git a/protocol/dogstatsd/grpc.proto b/protocol/dogstatsd/grpc.proto new file mode 100644 index 000000000..527af8896 --- /dev/null +++ b/protocol/dogstatsd/grpc.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; +package dogstatsd; + +message Empty {} + +message DogstatsdPacket { + bytes packetBytes = 1; +} + +service DogstatsdGRPC { + rpc SendPacket(DogstatsdPacket) returns (Empty); +} \ No newline at end of file diff --git a/server.go b/server.go index 1802521a5..57d372253 100644 --- a/server.go +++ b/server.go @@ -1182,11 +1182,14 @@ func (s *Server) processMetricPacket(numBytes int, buf []byte, packetPool *sync. s.HandleMetricPacket(splitPacket.Chunk()) } - // the Metric struct created by HandleMetricPacket has no byte slices in it, - // only strings - // therefore there are no outstanding references to this byte slice, we - // can return it to the pool - packetPool.Put(buf) + //Only return to the pool if there is a pool + if packetPool != nil { + // the Metric struct created by HandleMetricPacket has no byte slices in it, + // only strings + // therefore there are no outstanding references to this byte slice, we + // can return it to the pool + packetPool.Put(buf) + } } // ReadStatsdDatagramSocket reads statsd metrics packets from connection off a unix datagram socket. diff --git a/ssf/grpc.pb.go b/ssf/grpc.pb.go new file mode 100644 index 000000000..caa1872f8 --- /dev/null +++ b/ssf/grpc.pb.go @@ -0,0 +1,364 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: ssf/grpc.proto + +package ssf + +import ( + context "context" + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + grpc "google.golang.org/grpc" + io "io" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type Empty struct { +} + +func (m *Empty) Reset() { *m = Empty{} } +func (m *Empty) String() string { return proto.CompactTextString(m) } +func (*Empty) ProtoMessage() {} +func (*Empty) Descriptor() ([]byte, []int) { + return fileDescriptor_4a81259b8daae8b4, []int{0} +} +func (m *Empty) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Empty) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Empty.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Empty) XXX_Merge(src proto.Message) { + xxx_messageInfo_Empty.Merge(m, src) +} +func (m *Empty) XXX_Size() int { + return m.Size() +} +func (m *Empty) XXX_DiscardUnknown() { + xxx_messageInfo_Empty.DiscardUnknown(m) +} + +var xxx_messageInfo_Empty proto.InternalMessageInfo + +func init() { + proto.RegisterType((*Empty)(nil), "ssf.Empty") +} + +func init() { proto.RegisterFile("ssf/grpc.proto", fileDescriptor_4a81259b8daae8b4) } + +var fileDescriptor_4a81259b8daae8b4 = []byte{ + // 139 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x2b, 0x2e, 0x4e, 0xd3, + 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2e, 0x2e, 0x4e, 0x93, + 0x12, 0x00, 0x09, 0x16, 0x27, 0xe6, 0x16, 0xe4, 0xa4, 0x42, 0x84, 0x95, 0xd8, 0xb9, 0x58, 0x5d, + 0x73, 0x0b, 0x4a, 0x2a, 0x8d, 0xf4, 0xb9, 0xd8, 0x83, 0x83, 0xdd, 0xdc, 0x83, 0x02, 0x9c, 0x85, + 0x54, 0xb8, 0x38, 0x82, 0x53, 0xf3, 0x52, 0x82, 0x0b, 0x12, 0xf3, 0x84, 0x78, 0xf4, 0x8a, 0x8b, + 0xd3, 0xf4, 0x82, 0x83, 0xdd, 0x40, 0x3c, 0x29, 0x2e, 0x30, 0x0f, 0xac, 0xc1, 0x49, 0xe2, 0xc4, + 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, + 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0x46, 0x1b, 0x03, 0x02, 0x00, + 0x00, 0xff, 0xff, 0xe5, 0x1b, 0x3e, 0x34, 0x83, 0x00, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// SSFGRPCClient is the client API for SSFGRPC service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type SSFGRPCClient interface { + SendSpan(ctx context.Context, in *SSFSpan, opts ...grpc.CallOption) (*Empty, error) +} + +type sSFGRPCClient struct { + cc *grpc.ClientConn +} + +func NewSSFGRPCClient(cc *grpc.ClientConn) SSFGRPCClient { + return &sSFGRPCClient{cc} +} + +func (c *sSFGRPCClient) SendSpan(ctx context.Context, in *SSFSpan, opts ...grpc.CallOption) (*Empty, error) { + out := new(Empty) + err := c.cc.Invoke(ctx, "/ssf.SSFGRPC/SendSpan", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// SSFGRPCServer is the server API for SSFGRPC service. +type SSFGRPCServer interface { + SendSpan(context.Context, *SSFSpan) (*Empty, error) +} + +func RegisterSSFGRPCServer(s *grpc.Server, srv SSFGRPCServer) { + s.RegisterService(&_SSFGRPC_serviceDesc, srv) +} + +func _SSFGRPC_SendSpan_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SSFSpan) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SSFGRPCServer).SendSpan(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ssf.SSFGRPC/SendSpan", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SSFGRPCServer).SendSpan(ctx, req.(*SSFSpan)) + } + return interceptor(ctx, in, info, handler) +} + +var _SSFGRPC_serviceDesc = grpc.ServiceDesc{ + ServiceName: "ssf.SSFGRPC", + HandlerType: (*SSFGRPCServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SendSpan", + Handler: _SSFGRPC_SendSpan_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "ssf/grpc.proto", +} + +func (m *Empty) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Empty) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + return i, nil +} + +func encodeVarintGrpc(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *Empty) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func sovGrpc(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozGrpc(x uint64) (n int) { + return sovGrpc(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Empty) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowGrpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Empty: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Empty: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipGrpc(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthGrpc + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipGrpc(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthGrpc + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthGrpc + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowGrpc + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipGrpc(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthGrpc + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthGrpc = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowGrpc = fmt.Errorf("proto: integer overflow") +) diff --git a/ssf/grpc.proto b/ssf/grpc.proto new file mode 100644 index 000000000..bbb8451e6 --- /dev/null +++ b/ssf/grpc.proto @@ -0,0 +1,10 @@ +syntax = "proto3"; +package ssf; + +import "ssf/sample.proto"; + +message Empty {} + +service SSFGRPC { + rpc SendSpan(ssf.SSFSpan) returns (Empty); +}