Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove support for gRPC-Plugin #5388

Merged
merged 12 commits into from
May 11, 2024
6 changes: 0 additions & 6 deletions examples/memstore-plugin/README.md

This file was deleted.

78 changes: 0 additions & 78 deletions examples/memstore-plugin/main.go

This file was deleted.

120 changes: 7 additions & 113 deletions plugin/storage/grpc/config/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// you may not use this file ex cept in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
Expand All @@ -17,11 +17,8 @@ package config
import (
"context"
"fmt"
"os/exec"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
Expand All @@ -34,21 +31,16 @@ import (
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
)

var pluginHealthCheckInterval = time.Second * 60

// Configuration describes the options to customize the storage behavior.
type Configuration struct {
PluginBinary string `yaml:"binary" mapstructure:"binary"`
PluginConfigurationFile string `yaml:"configuration-file" mapstructure:"configuration_file"`
PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"`
RemoteServerAddr string `yaml:"server" mapstructure:"server"`
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
TenancyOpts tenancy.Options
PluginLogLevel string `yaml:"log-level" mapstructure:"log_level"`
RemoteServerAddr string `yaml:"server" mapstructure:"server"`
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
TenancyOpts tenancy.Options

pluginHealthCheck *time.Ticker
pluginHealthCheckDone chan bool
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
pluginRPCClient plugin.ClientProtocol
remoteConn *grpc.ClientConn
}

Expand All @@ -74,11 +66,7 @@ type PluginBuilder interface {

// Build instantiates a PluginServices
func (c *Configuration) Build(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
if c.PluginBinary != "" {
return c.buildPlugin(logger, tracerProvider)
} else {
return c.buildRemote(logger, tracerProvider)
}
return c.buildRemote(logger, tracerProvider)
}

func (c *Configuration) Close() error {
Expand Down Expand Up @@ -134,97 +122,3 @@ func (c *Configuration) buildRemote(logger *zap.Logger, tracerProvider trace.Tra
Capabilities: grpcClient,
}, nil
}

func (c *Configuration) buildPlugin(logger *zap.Logger, tracerProvider trace.TracerProvider) (*ClientPluginServices, error) {
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(tracerProvider))),
}

tenancyMgr := tenancy.NewManager(&c.TenancyOpts)
if tenancyMgr.Enabled {
opts = append(opts, grpc.WithUnaryInterceptor(tenancy.NewClientUnaryInterceptor(tenancyMgr)))
opts = append(opts, grpc.WithStreamInterceptor(tenancy.NewClientStreamInterceptor(tenancyMgr)))
}

// #nosec G204
cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile)
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: shared.Handshake,
VersionedPlugins: map[int]plugin.PluginSet{
1: shared.PluginMap,
},
Cmd: cmd,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Logger: hclog.New(&hclog.LoggerOptions{
Level: hclog.LevelFromString(c.PluginLogLevel),
}),
GRPCDialOptions: opts,
})

rpcClient, err := client.Client()
if err != nil {
return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %w", err)
}

raw, err := rpcClient.Dispense(shared.StoragePluginIdentifier)
if err != nil {
return nil, fmt.Errorf("unable to retrieve storage plugin instance: %w", err)
}

// in practice, the type of `raw` is *shared.grpcClient, and type casts below cannot fail
storagePlugin, ok := raw.(shared.StoragePlugin)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.StoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
archiveStoragePlugin, ok := raw.(shared.ArchiveStoragePlugin)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.ArchiveStoragePlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
streamingSpanWriterPlugin, ok := raw.(shared.StreamingSpanWriterPlugin)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.StreamingSpanWriterPlugin for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}
capabilities, ok := raw.(shared.PluginCapabilities)
if !ok {
return nil, fmt.Errorf("unable to cast %T to shared.PluginCapabilities for plugin \"%s\"",
raw, shared.StoragePluginIdentifier)
}

if err := c.startPluginHealthCheck(rpcClient, logger); err != nil {
return nil, fmt.Errorf("initial plugin health check failed: %w", err)
}

return &ClientPluginServices{
PluginServices: shared.PluginServices{
Store: storagePlugin,
ArchiveStore: archiveStoragePlugin,
StreamingSpanWriter: streamingSpanWriterPlugin,
},
Capabilities: capabilities,
killPluginClient: client.Kill,
}, nil
}

func (c *Configuration) startPluginHealthCheck(rpcClient plugin.ClientProtocol, logger *zap.Logger) error {
c.pluginRPCClient = rpcClient
c.pluginHealthCheckDone = make(chan bool)
c.pluginHealthCheck = time.NewTicker(pluginHealthCheckInterval)

go func() {
for {
select {
case <-c.pluginHealthCheckDone:
return
case <-c.pluginHealthCheck.C:
if err := c.pluginRPCClient.Ping(); err != nil {
logger.Fatal("plugin health check failed", zap.Error(err))
}
}
}
}()

return c.pluginRPCClient.Ping()
}
4 changes: 0 additions & 4 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,9 @@ func TestWithConfiguration(t *testing.T) {
v, command := config.Viperize(f.AddFlags)
err := command.ParseFlags([]string{
"--grpc-storage-plugin.log-level=debug",
"--grpc-storage-plugin.binary=noop-grpc-plugin",
"--grpc-storage-plugin.configuration-file=config.json",
})
require.NoError(t, err)
f.InitFromViper(v, zap.NewNop())
assert.Equal(t, "noop-grpc-plugin", f.options.Configuration.PluginBinary)
assert.Equal(t, "config.json", f.options.Configuration.PluginConfigurationFile)
assert.Equal(t, "debug", f.options.Configuration.PluginLogLevel)
require.NoError(t, f.Close())
}
Expand Down
46 changes: 0 additions & 46 deletions plugin/storage/grpc/grpc.go

This file was deleted.

12 changes: 0 additions & 12 deletions plugin/storage/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package grpc
import (
"flag"
"fmt"
"log"
"time"

"github.com/spf13/viper"
Expand All @@ -28,16 +27,12 @@ import (
)

const (
pluginBinary = "grpc-storage-plugin.binary"
pluginConfigurationFile = "grpc-storage-plugin.configuration-file"
pluginLogLevel = "grpc-storage-plugin.log-level"
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
remotePrefix = "grpc-storage"
remoteServer = remotePrefix + ".server"
remoteConnectionTimeout = remotePrefix + ".connection-timeout"
defaultPluginLogLevel = "warn"
defaultConnectionTimeout = time.Duration(5 * time.Second)

deprecatedSidecar = "(deprecated, will be removed after 2024-03-01) "
)

// Options contains GRPC plugins configs and provides the ability
Expand All @@ -56,17 +51,13 @@ func tlsFlagsConfig() tlscfg.ClientFlagsConfig {
func (opt *Options) AddFlags(flagSet *flag.FlagSet) {
tlsFlagsConfig().AddFlags(flagSet)

flagSet.String(pluginBinary, "", deprecatedSidecar+"The location of the plugin binary")
flagSet.String(pluginConfigurationFile, "", deprecatedSidecar+"A path pointing to the plugin's configuration file, made available to the plugin with the --config arg")
flagSet.String(pluginLogLevel, defaultPluginLogLevel, "Set the log level of the plugin's logger")
flagSet.String(remoteServer, "", "The remote storage gRPC server address as host:port")
flagSet.Duration(remoteConnectionTimeout, defaultConnectionTimeout, "The remote storage gRPC server connection timeout")
}

// InitFromViper initializes Options with properties from viper
func (opt *Options) InitFromViper(v *viper.Viper) error {
opt.Configuration.PluginBinary = v.GetString(pluginBinary)
opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile)
opt.Configuration.PluginLogLevel = v.GetString(pluginLogLevel)
opt.Configuration.RemoteServerAddr = v.GetString(remoteServer)
var err error
Expand All @@ -76,8 +67,5 @@ func (opt *Options) InitFromViper(v *viper.Viper) error {
}
opt.Configuration.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout)
opt.Configuration.TenancyOpts = tenancy.InitFromViper(v)
if opt.Configuration.PluginBinary != "" {
log.Printf(deprecatedSidecar + "using sidecar model of grpc-plugin storage, please upgrade to 'remote' gRPC storage. https://github.com/jaegertracing/jaeger/issues/4647")
}
return nil
}
6 changes: 0 additions & 6 deletions plugin/storage/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,12 @@ func TestOptionsWithFlags(t *testing.T) {
opts := &Options{}
v, command := config.Viperize(opts.AddFlags, tenancy.AddFlags)
err := command.ParseFlags([]string{
"--grpc-storage-plugin.binary=noop-grpc-plugin",
"--grpc-storage-plugin.configuration-file=config.json",
"--grpc-storage-plugin.log-level=debug",
"--multi-tenancy.header=x-scope-orgid",
})
require.NoError(t, err)
opts.InitFromViper(v)

assert.Equal(t, "noop-grpc-plugin", opts.Configuration.PluginBinary)
assert.Equal(t, "config.json", opts.Configuration.PluginConfigurationFile)
assert.Equal(t, "debug", opts.Configuration.PluginLogLevel)
assert.False(t, opts.Configuration.TenancyOpts.Enabled)
assert.Equal(t, "x-scope-orgid", opts.Configuration.TenancyOpts.Header)
Expand All @@ -55,7 +51,6 @@ func TestRemoteOptionsWithFlags(t *testing.T) {
require.NoError(t, err)
opts.InitFromViper(v)

assert.Equal(t, "", opts.Configuration.PluginBinary)
assert.Equal(t, "localhost:2001", opts.Configuration.RemoteServerAddr)
assert.True(t, opts.Configuration.RemoteTLS.Enabled)
assert.Equal(t, 60*time.Second, opts.Configuration.RemoteConnectTimeout)
Expand All @@ -72,7 +67,6 @@ func TestRemoteOptionsNoTLSWithFlags(t *testing.T) {
require.NoError(t, err)
opts.InitFromViper(v)

assert.Equal(t, "", opts.Configuration.PluginBinary)
assert.Equal(t, "localhost:2001", opts.Configuration.RemoteServerAddr)
assert.False(t, opts.Configuration.RemoteTLS.Enabled)
assert.Equal(t, 60*time.Second, opts.Configuration.RemoteConnectTimeout)
Expand Down
Loading
Loading