diff --git a/Gopkg.lock b/Gopkg.lock index c9a06237486..56ed32cdb89 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -449,6 +449,25 @@ pruneopts = "UT" revision = "e80d13ce29ede4452c43dea11e79b9bc8a15b478" +[[projects]] + digest = "1:f6ecb0dc7d965d75815729fd300cc0cd17004fb2d6172a7f37192494936942e5" + name = "github.com/hashicorp/go-hclog" + packages = ["."] + pruneopts = "UT" + revision = "6907afbebd2eef854f0be9194eb79b0ba75d7b29" + version = "v0.8.0" + +[[projects]] + digest = "1:5e1aece859ec4195f3d16dd3b64a0f111e186b9e95d75141465595063e3a5254" + name = "github.com/hashicorp/go-plugin" + packages = [ + ".", + "internal/plugin", + ] + pruneopts = "UT" + revision = "52e1c4730856c1438ced7597c9b5c585a7bd06a2" + version = "v1.0.0" + [[projects]] digest = "1:c0d19ab64b32ce9fe5cf4ddceba78d5bc9807f0016db6b1183599da3dcc24d10" name = "github.com/hashicorp/hcl" @@ -468,6 +487,14 @@ revision = "8cb6e5b959231cc1119e43259c4a608f9c51a241" version = "v1.0.0" +[[projects]] + branch = "master" + digest = "1:a4826c308e84f5f161b90b54a814f0be7d112b80164b9b884698a6903ea47ab3" + name = "github.com/hashicorp/yamux" + packages = ["."] + pruneopts = "UT" + revision = "2f1d1f20f75d5404f53b9edf6b53ed5505508675" + [[projects]] digest = "1:870d441fe217b8e689d7949fef6e43efbc787e50f200cb1e70dbca9204a1d6be" name = "github.com/inconshreveable/mousetrap" @@ -532,6 +559,14 @@ revision = "c12348ce28de40eed0136aa2b644d0ee0650e56c" version = "v1.0.1" +[[projects]] + digest = "1:42eb1f52b84a06820cedc9baec2e710bfbda3ee6dac6cdb97f8b9a5066134ec6" + name = "github.com/mitchellh/go-testing-interface" + packages = ["."] + pruneopts = "UT" + revision = "6d0b8010fcc857872e42fc6c931227569016843c" + version = "v1.0.0" + [[projects]] digest = "1:53bc4cd4914cd7cd52139990d5170d6dc99067ae31c56530621b18b35fc30318" name = "github.com/mitchellh/mapstructure" @@ -580,6 +615,14 @@ revision = "eafdab6b0663b4b528c35975c8b0e78be6e25261" version = "v0.1" +[[projects]] + digest = "1:9ec6cf1df5ad1d55cf41a43b6b1e7e118a91bade4f68ff4303379343e40c0e25" + name = "github.com/oklog/run" + packages = ["."] + pruneopts = "UT" + revision = "4dadeb3030eda0273a12382bb2348ffc7c9d1a39" + version = "v1.0.0" + [[projects]] branch = "master" digest = "1:bccaead3121ab7964fd80fab704f612e5893fb5a2c581d520ec847ed8cfac27e" @@ -924,11 +967,11 @@ [[projects]] branch = "master" - digest = "1:157e5158a7f29d6633eed502880463aa43de3c36d451c7051591efd12f455ac8" + digest = "1:dcd2e9de31e20e01b156288ce4fc239498564295bdbb59d8aaf591af94778162" name = "golang.org/x/sys" packages = ["unix"] pruneopts = "UT" - revision = "a129542de9ae0895210abff9c95d67a1f33cb93d" + revision = "804c0c7841b595338f049388006a3945cb3cfad1" [[projects]] digest = "1:66a2f252a58b4fbbad0e4e180e1d85a83c222b6bce09c3dcdef3dc87c72eda7c" @@ -975,7 +1018,7 @@ "internal/semver", ] pruneopts = "UT" - revision = "ad9eeb80039afa52dec461f6814f3f79c5b45c39" + revision = "550556f78a900afacb9e8b22219f5e5b5c6106db" [[projects]] branch = "master" @@ -989,7 +1032,7 @@ revision = "357c62f0e4bbba7e6cc403ae09edcf3e2b9028fe" [[projects]] - digest = "1:522259a6c3c27bd42604494eb144f40975bebcf51b044e4fddcdef442063c567" + digest = "1:e35762e90451df6e89c7fc9b42f0f0d8ab882e4931fdcec699482e4471640ec2" name = "google.golang.org/grpc" packages = [ ".", @@ -1004,6 +1047,8 @@ "encoding", "encoding/proto", "grpclog", + "health", + "health/grpc_health_v1", "internal", "internal/backoff", "internal/balancerload", @@ -1123,6 +1168,8 @@ "github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway", "github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger", "github.com/grpc-ecosystem/grpc-gateway/protoc-gen-swagger/options", + "github.com/hashicorp/go-hclog", + "github.com/hashicorp/go-plugin", "github.com/kr/pretty", "github.com/opentracing-contrib/go-stdlib/nethttp", "github.com/opentracing/opentracing-go", diff --git a/Gopkg.toml b/Gopkg.toml index 60a45ba5328..f9d56d26542 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -157,3 +157,11 @@ required = [ [[constraint]] name = "github.com/rs/cors" version = "1.3.0" + +[[constraint]] + name = "github.com/hashicorp/go-plugin" + version = "1.0.0" + +[[constraint]] + name = "github.com/hashicorp/go-hclog" + version = "0.8.0" diff --git a/examples/memstore-plugin/empty_test.go b/examples/memstore-plugin/empty_test.go new file mode 100644 index 00000000000..38aae80afae --- /dev/null +++ b/examples/memstore-plugin/empty_test.go @@ -0,0 +1,15 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main diff --git a/examples/memstore-plugin/main.go b/examples/memstore-plugin/main.go new file mode 100644 index 00000000000..6dbd2a045d3 --- /dev/null +++ b/examples/memstore-plugin/main.go @@ -0,0 +1,65 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "flag" + "path" + "strings" + + "github.com/spf13/viper" + + "github.com/jaegertracing/jaeger/plugin/storage/grpc" + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +var configPath string + +func main() { + flag.StringVar(&configPath, "config", "", "A path to the plugin's configuration file") + flag.Parse() + + if configPath != "" { + viper.SetConfigFile(path.Base(configPath)) + viper.AddConfigPath(path.Dir(configPath)) + } + + v := viper.New() + v.AutomaticEnv() + v.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "_")) + + opts := memory.Options{} + opts.InitFromViper(v) + + grpc.Serve(&memoryStore{store: memory.NewStore()}) +} + +type memoryStore struct { + store *memory.Store +} + +func (ns *memoryStore) DependencyReader() dependencystore.Reader { + return ns.store +} + +func (ns *memoryStore) SpanReader() spanstore.Reader { + return ns.store +} + +func (ns *memoryStore) SpanWriter() spanstore.Writer { + return ns.store +} diff --git a/plugin/storage/factory.go b/plugin/storage/factory.go index 5bc0b11be23..44812af864c 100644 --- a/plugin/storage/factory.go +++ b/plugin/storage/factory.go @@ -26,6 +26,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/badger" "github.com/jaegertracing/jaeger/plugin/storage/cassandra" "github.com/jaegertracing/jaeger/plugin/storage/es" + "github.com/jaegertracing/jaeger/plugin/storage/grpc" "github.com/jaegertracing/jaeger/plugin/storage/kafka" "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage" @@ -38,6 +39,7 @@ const ( elasticsearchStorageType = "elasticsearch" memoryStorageType = "memory" kafkaStorageType = "kafka" + grpcPluginStorageType = "grpc-plugin" badgerStorageType = "badger" downsamplingRatio = "downsampling.ratio" downsamplingHashSalt = "downsampling.hashsalt" @@ -48,7 +50,7 @@ const ( defaultDownsamplingHashSalt = "" ) -var allStorageTypes = []string{cassandraStorageType, elasticsearchStorageType, memoryStorageType, kafkaStorageType, badgerStorageType} +var allStorageTypes = []string{cassandraStorageType, elasticsearchStorageType, memoryStorageType, kafkaStorageType, badgerStorageType, grpcPluginStorageType} // Factory implements storage.Factory interface as a meta-factory for storage components. type Factory struct { @@ -90,6 +92,8 @@ func (f *Factory) getFactoryOfType(factoryType string) (storage.Factory, error) return kafka.NewFactory(), nil case badgerStorageType: return badger.NewFactory(), nil + case grpcPluginStorageType: + return grpc.NewFactory(), nil default: return nil, fmt.Errorf("unknown storage type %s. Valid types are %v", factoryType, allStorageTypes) } diff --git a/plugin/storage/grpc/config/config.go b/plugin/storage/grpc/config/config.go new file mode 100644 index 00000000000..e8890d360a5 --- /dev/null +++ b/plugin/storage/grpc/config/config.go @@ -0,0 +1,76 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +import ( + "fmt" + "os/exec" + "runtime" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-plugin" + + "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" +) + +// Configuration describes the options to customize the storage behavior +type Configuration struct { + PluginBinary string `yaml:"binary"` + PluginConfigurationFile string `yaml:"configuration-file"` +} + +// Build instantiates a StoragePlugin +func (c *Configuration) Build() (shared.StoragePlugin, error) { + // #nosec G204 + cmd := exec.Command(c.PluginBinary, "--config", c.PluginConfigurationFile) + + client := plugin.NewClient(&plugin.ClientConfig{ + HandshakeConfig: shared.Handshake, + VersionedPlugins: map[int]plugin.PluginSet{ + 1: shared.PluginMap, + }, + Cmd: cmd, + AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, + Logger: hclog.New(&hclog.LoggerOptions{ + Level: hclog.Warn, + }), + }) + + runtime.SetFinalizer(client, func(c *plugin.Client) { + c.Kill() + }) + + rpcClient, err := client.Client() + if err != nil { + return nil, fmt.Errorf("error attempting to connect to plugin rpc client: %s", err) + } + + raw, err := rpcClient.Dispense(shared.StoragePluginIdentifier) + if err != nil { + return nil, fmt.Errorf("unable to retrieve storage plugin instance: %s", err) + } + + storagePlugin, ok := raw.(shared.StoragePlugin) + if !ok { + return nil, fmt.Errorf("unexpected type for plugin \"%s\"", shared.StoragePluginIdentifier) + } + + return storagePlugin, nil +} + +// PluginBuilder is used to create storage plugins +type PluginBuilder interface { + Build() (shared.StoragePlugin, error) +} diff --git a/plugin/storage/grpc/config/empty_test.go b/plugin/storage/grpc/config/empty_test.go new file mode 100644 index 00000000000..6e632bd9e87 --- /dev/null +++ b/plugin/storage/grpc/config/empty_test.go @@ -0,0 +1,15 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config diff --git a/plugin/storage/grpc/factory.go b/plugin/storage/grpc/factory.go new file mode 100644 index 00000000000..0acbe56f7b7 --- /dev/null +++ b/plugin/storage/grpc/factory.go @@ -0,0 +1,84 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc + +import ( + "flag" + + "github.com/spf13/viper" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" + "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +// Factory implements storage.Factory and creates storage components backed by a storage plugin. +type Factory struct { + options Options + metricsFactory metrics.Factory + logger *zap.Logger + + builder config.PluginBuilder + + store shared.StoragePlugin +} + +// NewFactory creates a new Factory. +func NewFactory() *Factory { + return &Factory{} +} + +// AddFlags implements plugin.Configurable +func (f *Factory) AddFlags(flagSet *flag.FlagSet) { + f.options.AddFlags(flagSet) +} + +// InitFromViper implements plugin.Configurable +func (f *Factory) InitFromViper(v *viper.Viper) { + f.options.InitFromViper(v) + f.builder = &f.options.Configuration +} + +// Initialize implements storage.Factory +func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { + f.metricsFactory, f.logger = metricsFactory, logger + + store, err := f.builder.Build() + if err != nil { + return err + } + + f.store = store + logger.Info("External plugin storage configuration", zap.Any("configuration", f.options.Configuration)) + return nil +} + +// CreateSpanReader implements storage.Factory +func (f *Factory) CreateSpanReader() (spanstore.Reader, error) { + return f.store.SpanReader(), nil +} + +// CreateSpanWriter implements storage.Factory +func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) { + return f.store.SpanWriter(), nil +} + +// CreateDependencyReader implements storage.Factory +func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) { + return f.store.DependencyReader(), nil +} diff --git a/plugin/storage/grpc/factory_test.go b/plugin/storage/grpc/factory_test.go new file mode 100644 index 00000000000..67a2607290f --- /dev/null +++ b/plugin/storage/grpc/factory_test.go @@ -0,0 +1,110 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc + +import ( + "errors" + "testing" + + "github.com/spf13/viper" + "github.com/stretchr/testify/assert" + "github.com/uber/jaeger-lib/metrics" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" + "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/storage/dependencystore" + dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" + "github.com/jaegertracing/jaeger/storage/spanstore" + spanStoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" +) + +var _ storage.Factory = new(Factory) + +type mockPluginBuilder struct { + plugin *mockPlugin + err error +} + +func (b *mockPluginBuilder) Build() (shared.StoragePlugin, error) { + if b.err != nil { + return nil, b.err + } + return b.plugin, nil +} + +type mockPlugin struct { + spanReader spanstore.Reader + spanWriter spanstore.Writer + dependencyReader dependencystore.Reader +} + +func (mp *mockPlugin) SpanReader() spanstore.Reader { + return mp.spanReader +} + +func (mp *mockPlugin) SpanWriter() spanstore.Writer { + return mp.spanWriter +} + +func (mp *mockPlugin) DependencyReader() dependencystore.Reader { + return mp.dependencyReader +} + +func TestGRPCStorageFactory(t *testing.T) { + f := NewFactory() + v := viper.New() + f.InitFromViper(v) + + // after InitFromViper, f.builder points to a real plugin builder that will fail in unit tests, + // so we override it with a mock. + f.builder = &mockPluginBuilder{ + err: errors.New("made-up error"), + } + assert.EqualError(t, f.Initialize(metrics.NullFactory, zap.NewNop()), "made-up error") + + f.builder = &mockPluginBuilder{ + plugin: &mockPlugin{ + spanWriter: new(spanStoreMocks.Writer), + spanReader: new(spanStoreMocks.Reader), + dependencyReader: new(dependencyStoreMocks.Reader), + }, + } + assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) + + assert.NotNil(t, f.store) + reader, err := f.CreateSpanReader() + assert.NoError(t, err) + assert.Equal(t, f.store.SpanReader(), reader) + writer, err := f.CreateSpanWriter() + assert.NoError(t, err) + assert.Equal(t, f.store.SpanWriter(), writer) + depReader, err := f.CreateDependencyReader() + assert.NoError(t, err) + assert.Equal(t, f.store.DependencyReader(), depReader) +} + +func TestWithConfiguration(t *testing.T) { + f := NewFactory() + v, command := config.Viperize(f.AddFlags) + command.ParseFlags([]string{ + "--grpc-storage-plugin.binary=noop-grpc-plugin", + "--grpc-storage-plugin.configuration-file=config.json", + }) + f.InitFromViper(v) + assert.Equal(t, f.options.Configuration.PluginBinary, "noop-grpc-plugin") + assert.Equal(t, f.options.Configuration.PluginConfigurationFile, "config.json") +} diff --git a/plugin/storage/grpc/grpc.go b/plugin/storage/grpc/grpc.go new file mode 100644 index 00000000000..b693e62d459 --- /dev/null +++ b/plugin/storage/grpc/grpc.go @@ -0,0 +1,43 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc + +import ( + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" + + "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" +) + +// Serve creates a plugin configuration using the implementation of StoragePlugin and then serves it. +func Serve(implementation shared.StoragePlugin) { + ServeWithGRPCServer(implementation, plugin.DefaultGRPCServer) +} + +// ServeWithGRPCServer creates a plugin configuration using the implementation of StoragePlugin and +// function to create grpcServer, and then serves it. +func ServeWithGRPCServer(implementation shared.StoragePlugin, grpcServer func([]grpc.ServerOption) *grpc.Server) { + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: shared.Handshake, + VersionedPlugins: map[int]plugin.PluginSet{ + 1: map[string]plugin.Plugin{ + shared.StoragePluginIdentifier: &shared.StorageGRPCPlugin{ + Impl: implementation, + }, + }, + }, + GRPCServer: grpcServer, + }) +} diff --git a/plugin/storage/grpc/options.go b/plugin/storage/grpc/options.go new file mode 100644 index 00000000000..82cac8d9311 --- /dev/null +++ b/plugin/storage/grpc/options.go @@ -0,0 +1,44 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc + +import ( + "flag" + + "github.com/spf13/viper" + + "github.com/jaegertracing/jaeger/plugin/storage/grpc/config" +) + +const pluginBinary = "grpc-storage-plugin.binary" +const pluginConfigurationFile = "grpc-storage-plugin.configuration-file" + +// Options contains GRPC plugins configs and provides the ability +// to bind them to command line flags +type Options struct { + Configuration config.Configuration +} + +// AddFlags adds flags for Options +func (opt *Options) AddFlags(flagSet *flag.FlagSet) { + flagSet.String(pluginBinary, "", "The location of the plugin binary") + flagSet.String(pluginConfigurationFile, "", "A path pointing to the plugin's configuration file, made available to the plugin with the --config arg") +} + +// InitFromViper initializes Options with properties from viper +func (opt *Options) InitFromViper(v *viper.Viper) { + opt.Configuration.PluginBinary = v.GetString(pluginBinary) + opt.Configuration.PluginConfigurationFile = v.GetString(pluginConfigurationFile) +} diff --git a/plugin/storage/grpc/options_test.go b/plugin/storage/grpc/options_test.go new file mode 100644 index 00000000000..510a1033e93 --- /dev/null +++ b/plugin/storage/grpc/options_test.go @@ -0,0 +1,36 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package grpc + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/pkg/config" +) + +func TestOptionsWithFlags(t *testing.T) { + opts := &Options{} + v, command := config.Viperize(opts.AddFlags) + command.ParseFlags([]string{ + "--grpc-storage-plugin.binary=noop-grpc-plugin", + "--grpc-storage-plugin.configuration-file=config.json", + }) + opts.InitFromViper(v) + + assert.Equal(t, opts.Configuration.PluginBinary, "noop-grpc-plugin") + assert.Equal(t, opts.Configuration.PluginConfigurationFile, "config.json") +} diff --git a/plugin/storage/grpc/shared/grpc_client.go b/plugin/storage/grpc/shared/grpc_client.go new file mode 100644 index 00000000000..8cf0f6edfa2 --- /dev/null +++ b/plugin/storage/grpc/shared/grpc_client.go @@ -0,0 +1,179 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + "io" + "time" + + "github.com/pkg/errors" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +// grpcClient implements shared.StoragePlugin and reads/writes spans and dependencies +type grpcClient struct { + readerClient storage_v1.SpanReaderPluginClient + writerClient storage_v1.SpanWriterPluginClient + depsReaderClient storage_v1.DependenciesReaderPluginClient +} + +// DependencyReader implements shared.StoragePlugin. +func (c *grpcClient) DependencyReader() dependencystore.Reader { + return c +} + +// SpanReader implements shared.StoragePlugin. +func (c *grpcClient) SpanReader() spanstore.Reader { + return c +} + +// SpanWriter implements shared.StoragePlugin. +func (c *grpcClient) SpanWriter() spanstore.Writer { + return c +} + +// GetTrace takes a traceID and returns a Trace associated with that traceID +func (c *grpcClient) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { + stream, err := c.readerClient.GetTrace(ctx, &storage_v1.GetTraceRequest{ + TraceID: traceID, + }) + if err != nil { + return nil, errors.Wrap(err, "plugin error") + } + + trace := model.Trace{} + for received, err := stream.Recv(); err != io.EOF; received, err = stream.Recv() { + if err != nil { + return nil, errors.Wrap(err, "grpc stream error") + } + + for i := range received.Spans { + trace.Spans = append(trace.Spans, &received.Spans[i]) + } + } + + return &trace, nil +} + +// GetServices returns a list of all known services +func (c *grpcClient) GetServices(ctx context.Context) ([]string, error) { + resp, err := c.readerClient.GetServices(ctx, &storage_v1.GetServicesRequest{}) + if err != nil { + return nil, errors.Wrap(err, "plugin error") + } + + return resp.Services, nil +} + +// GetOperations returns the operations of a given service +func (c *grpcClient) GetOperations(ctx context.Context, service string) ([]string, error) { + resp, err := c.readerClient.GetOperations(ctx, &storage_v1.GetOperationsRequest{ + Service: service, + }) + if err != nil { + return nil, errors.Wrap(err, "plugin error") + } + + return resp.Operations, nil +} + +// FindTraces retrieves traces that match the traceQuery +func (c *grpcClient) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { + stream, err := c.readerClient.FindTraces(context.Background(), &storage_v1.FindTracesRequest{ + Query: &storage_v1.TraceQueryParameters{ + ServiceName: query.ServiceName, + OperationName: query.OperationName, + Tags: query.Tags, + StartTimeMin: query.StartTimeMin, + StartTimeMax: query.StartTimeMax, + DurationMin: query.DurationMin, + DurationMax: query.DurationMax, + NumTraces: int32(query.NumTraces), + }, + }) + if err != nil { + return nil, errors.Wrap(err, "plugin error") + } + + var traces []*model.Trace + var trace *model.Trace + var traceID model.TraceID + for received, err := stream.Recv(); err != io.EOF; received, err = stream.Recv() { + if err != nil { + return nil, errors.Wrap(err, "stream error") + } + + for i, span := range received.Spans { + if span.TraceID != traceID { + trace = &model.Trace{} + traceID = span.TraceID + traces = append(traces, trace) + } + trace.Spans = append(trace.Spans, &received.Spans[i]) + } + } + return traces, nil +} + +// FindTraceIDs retrieves traceIDs that match the traceQuery +func (c *grpcClient) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryParameters) ([]model.TraceID, error) { + resp, err := c.readerClient.FindTraceIDs(context.Background(), &storage_v1.FindTraceIDsRequest{ + Query: &storage_v1.TraceQueryParameters{ + ServiceName: query.ServiceName, + OperationName: query.OperationName, + Tags: query.Tags, + StartTimeMin: query.StartTimeMin, + StartTimeMax: query.StartTimeMax, + DurationMin: query.DurationMin, + DurationMax: query.DurationMax, + NumTraces: int32(query.NumTraces), + }, + }) + if err != nil { + return nil, errors.Wrap(err, "plugin error") + } + + return resp.TraceIDs, nil +} + +// WriteSpan saves the span +func (c *grpcClient) WriteSpan(span *model.Span) error { + _, err := c.writerClient.WriteSpan(context.Background(), &storage_v1.WriteSpanRequest{ + Span: span, + }) + if err != nil { + return errors.Wrap(err, "plugin error") + } + + return nil +} + +// GetDependencies returns all interservice dependencies +func (c *grpcClient) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + resp, err := c.depsReaderClient.GetDependencies(context.Background(), &storage_v1.GetDependenciesRequest{ + EndTime: endTs, + StartTime: endTs.Add(-lookback), + }) + if err != nil { + return nil, errors.Wrap(err, "plugin error") + } + + return resp.Dependencies, nil +} diff --git a/plugin/storage/grpc/shared/grpc_client_test.go b/plugin/storage/grpc/shared/grpc_client_test.go new file mode 100644 index 00000000000..3ea04aa0b9e --- /dev/null +++ b/plugin/storage/grpc/shared/grpc_client_test.go @@ -0,0 +1,257 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + "errors" + "io" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + grpcMocks "github.com/jaegertracing/jaeger/proto-gen/storage_v1/mocks" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +var ( + mockTraceID = model.NewTraceID(0, 123456) + mockTraceID2 = model.NewTraceID(0, 123457) + + mockTraceSpans = []model.Span{ + { + TraceID: mockTraceID, + SpanID: model.NewSpanID(1), + Process: &model.Process{}, + }, + { + TraceID: mockTraceID, + SpanID: model.NewSpanID(2), + Process: &model.Process{}, + }, + } + + mockTracesSpans = []model.Span{ + { + TraceID: mockTraceID, + SpanID: model.NewSpanID(1), + Process: &model.Process{}, + }, + { + TraceID: mockTraceID, + SpanID: model.NewSpanID(2), + Process: &model.Process{}, + }, + { + TraceID: mockTraceID2, + SpanID: model.NewSpanID(1), + Process: &model.Process{}, + }, + } +) + +type grpcClientTest struct { + client *grpcClient + spanReader *grpcMocks.SpanReaderPluginClient + spanWriter *grpcMocks.SpanWriterPluginClient + depsReader *grpcMocks.DependenciesReaderPluginClient +} + +func withGRPCClient(fn func(r *grpcClientTest)) { + spanReader := new(grpcMocks.SpanReaderPluginClient) + spanWriter := new(grpcMocks.SpanWriterPluginClient) + depReader := new(grpcMocks.DependenciesReaderPluginClient) + + r := &grpcClientTest{ + client: &grpcClient{ + readerClient: spanReader, + writerClient: spanWriter, + depsReaderClient: depReader, + }, + spanReader: spanReader, + spanWriter: spanWriter, + depsReader: depReader, + } + fn(r) +} + +func TestGRPCClientGetServices(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + r.spanReader.On("GetServices", mock.Anything, &storage_v1.GetServicesRequest{}). + Return(&storage_v1.GetServicesResponse{Services: []string{"service-a"}}, nil) + + s, err := r.client.GetServices(context.Background()) + assert.NoError(t, err) + assert.Equal(t, []string{"service-a"}, s) + }) +} + +func TestGRPCClientGetOperations(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + r.spanReader.On("GetOperations", mock.Anything, &storage_v1.GetOperationsRequest{ + Service: "service-a", + }).Return(&storage_v1.GetOperationsResponse{ + Operations: []string{"operation-a"}, + }, nil) + + s, err := r.client.GetOperations(context.Background(), "service-a") + assert.NoError(t, err) + assert.Equal(t, []string{"operation-a"}, s) + }) +} + +func TestGRPCClientGetTrace(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + traceClient := new(grpcMocks.SpanReaderPlugin_GetTraceClient) + traceClient.On("Recv").Return(&storage_v1.SpansResponseChunk{ + Spans: mockTraceSpans, + }, nil).Once() + traceClient.On("Recv").Return(nil, io.EOF) + r.spanReader.On("GetTrace", mock.Anything, &storage_v1.GetTraceRequest{ + TraceID: mockTraceID, + }).Return(traceClient, nil) + + var expectedSpans []*model.Span + for i := range mockTraceSpans { + expectedSpans = append(expectedSpans, &mockTraceSpans[i]) + } + + s, err := r.client.GetTrace(context.Background(), mockTraceID) + assert.NoError(t, err) + assert.Equal(t, &model.Trace{ + Spans: expectedSpans, + }, s) + }) +} + +func TestGRPCClientGetTrace_StreamError(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + traceClient := new(grpcMocks.SpanReaderPlugin_GetTraceClient) + traceClient.On("Recv").Return(nil, errors.New("an error")) + r.spanReader.On("GetTrace", mock.Anything, &storage_v1.GetTraceRequest{ + TraceID: mockTraceID, + }).Return(traceClient, nil) + + s, err := r.client.GetTrace(context.Background(), mockTraceID) + assert.Error(t, err) + assert.Nil(t, s) + }) +} + +func TestGRPCClientGetTrace_NoTrace(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + r.spanReader.On("GetTrace", mock.Anything, &storage_v1.GetTraceRequest{ + TraceID: mockTraceID, + }).Return(nil, spanstore.ErrTraceNotFound) + + s, err := r.client.GetTrace(context.Background(), mockTraceID) + assert.Error(t, err) + assert.Nil(t, s) + }) +} + +func TestGRPCClientFindTraces(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + traceClient := new(grpcMocks.SpanReaderPlugin_FindTracesClient) + traceClient.On("Recv").Return(&storage_v1.SpansResponseChunk{ + Spans: mockTracesSpans, + }, nil).Once() + traceClient.On("Recv").Return(nil, io.EOF) + r.spanReader.On("FindTraces", mock.Anything, &storage_v1.FindTracesRequest{ + Query: &storage_v1.TraceQueryParameters{}, + }).Return(traceClient, nil) + + s, err := r.client.FindTraces(context.Background(), &spanstore.TraceQueryParameters{}) + assert.NoError(t, err) + assert.NotNil(t, s) + assert.Equal(t, 2, len(s)) + }) +} + +func TestGRPCClientFindTraces_Error(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + r.spanReader.On("FindTraces", mock.Anything, &storage_v1.FindTracesRequest{ + Query: &storage_v1.TraceQueryParameters{}, + }).Return(nil, errors.New("an error")) + + s, err := r.client.FindTraces(context.Background(), &spanstore.TraceQueryParameters{}) + assert.Error(t, err) + assert.Nil(t, s) + }) +} + +func TestGRPCClientFindTraces_RecvError(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + traceClient := new(grpcMocks.SpanReaderPlugin_FindTracesClient) + traceClient.On("Recv").Return(nil, errors.New("an error")) + r.spanReader.On("FindTraces", mock.Anything, &storage_v1.FindTracesRequest{ + Query: &storage_v1.TraceQueryParameters{}, + }).Return(traceClient, nil) + + s, err := r.client.FindTraces(context.Background(), &spanstore.TraceQueryParameters{}) + assert.Error(t, err) + assert.Nil(t, s) + }) +} + +func TestGRPCClientFindTraceIDs(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + r.spanReader.On("FindTraceIDs", mock.Anything, &storage_v1.FindTraceIDsRequest{ + Query: &storage_v1.TraceQueryParameters{}, + }).Return(&storage_v1.FindTraceIDsResponse{ + TraceIDs: []model.TraceID{mockTraceID, mockTraceID2}, + }, nil) + + s, err := r.client.FindTraceIDs(context.Background(), &spanstore.TraceQueryParameters{}) + assert.NoError(t, err) + assert.Equal(t, []model.TraceID{mockTraceID, mockTraceID2}, s) + }) +} + +func TestGRPCClientWriteSpan(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + r.spanWriter.On("WriteSpan", mock.Anything, &storage_v1.WriteSpanRequest{ + Span: &mockTraceSpans[0], + }).Return(&storage_v1.WriteSpanResponse{}, nil) + + err := r.client.WriteSpan(&mockTraceSpans[0]) + assert.NoError(t, err) + }) +} + +func TestGRPCClientGetDependencies(t *testing.T) { + withGRPCClient(func(r *grpcClientTest) { + lookback := time.Duration(1 * time.Second) + end := time.Now() + deps := []model.DependencyLink{ + { + Source: "source", + Child: "child", + }, + } + r.depsReader.On("GetDependencies", mock.Anything, &storage_v1.GetDependenciesRequest{ + StartTime: end.Add(-lookback), + EndTime: end, + }).Return(&storage_v1.GetDependenciesResponse{Dependencies: deps}, nil) + + s, err := r.client.GetDependencies(end, lookback) + assert.NoError(t, err) + assert.Equal(t, deps, s) + }) +} diff --git a/plugin/storage/grpc/shared/grpc_server.go b/plugin/storage/grpc/shared/grpc_server.go new file mode 100644 index 00000000000..1db0ee1aa28 --- /dev/null +++ b/plugin/storage/grpc/shared/grpc_server.go @@ -0,0 +1,150 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + + "github.com/pkg/errors" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +const spanBatchSize = 1000 + +// grpcServer implements shared.StoragePlugin and reads/writes spans and dependencies +type grpcServer struct { + Impl StoragePlugin +} + +// GetDependencies returns all interservice dependencies +func (s *grpcServer) GetDependencies(ctx context.Context, r *storage_v1.GetDependenciesRequest) (*storage_v1.GetDependenciesResponse, error) { + deps, err := s.Impl.DependencyReader().GetDependencies(r.EndTime, r.EndTime.Sub(r.StartTime)) + if err != nil { + return nil, err + } + return &storage_v1.GetDependenciesResponse{ + Dependencies: deps, + }, nil +} + +// WriteSpan saves the span +func (s *grpcServer) WriteSpan(ctx context.Context, r *storage_v1.WriteSpanRequest) (*storage_v1.WriteSpanResponse, error) { + err := s.Impl.SpanWriter().WriteSpan(r.Span) + if err != nil { + return nil, err + } + return &storage_v1.WriteSpanResponse{}, nil +} + +// GetTrace takes a traceID and streams a Trace associated with that traceID +func (s *grpcServer) GetTrace(r *storage_v1.GetTraceRequest, stream storage_v1.SpanReaderPlugin_GetTraceServer) error { + trace, err := s.Impl.SpanReader().GetTrace(stream.Context(), r.TraceID) + if err != nil { + return err + } + + err = s.sendSpans(trace.Spans, stream.Send) + if err != nil { + return err + } + + return nil +} + +// GetServices returns a list of all known services +func (s *grpcServer) GetServices(ctx context.Context, r *storage_v1.GetServicesRequest) (*storage_v1.GetServicesResponse, error) { + services, err := s.Impl.SpanReader().GetServices(ctx) + if err != nil { + return nil, err + } + return &storage_v1.GetServicesResponse{ + Services: services, + }, nil +} + +// GetOperations returns the operations of a given service +func (s *grpcServer) GetOperations(ctx context.Context, r *storage_v1.GetOperationsRequest) (*storage_v1.GetOperationsResponse, error) { + operations, err := s.Impl.SpanReader().GetOperations(ctx, r.Service) + if err != nil { + return nil, err + } + return &storage_v1.GetOperationsResponse{ + Operations: operations, + }, nil +} + +// FindTraces streams traces that match the traceQuery +func (s *grpcServer) FindTraces(r *storage_v1.FindTracesRequest, stream storage_v1.SpanReaderPlugin_FindTracesServer) error { + traces, err := s.Impl.SpanReader().FindTraces(stream.Context(), &spanstore.TraceQueryParameters{ + ServiceName: r.Query.ServiceName, + OperationName: r.Query.OperationName, + Tags: r.Query.Tags, + StartTimeMin: r.Query.StartTimeMin, + StartTimeMax: r.Query.StartTimeMax, + DurationMin: r.Query.DurationMin, + DurationMax: r.Query.DurationMax, + NumTraces: int(r.Query.NumTraces), + }) + if err != nil { + return err + } + + for _, trace := range traces { + err = s.sendSpans(trace.Spans, stream.Send) + if err != nil { + return err + } + } + + return nil +} + +// FindTraceIDs retrieves traceIDs that match the traceQuery +func (s *grpcServer) FindTraceIDs(ctx context.Context, r *storage_v1.FindTraceIDsRequest) (*storage_v1.FindTraceIDsResponse, error) { + traceIDs, err := s.Impl.SpanReader().FindTraceIDs(ctx, &spanstore.TraceQueryParameters{ + ServiceName: r.Query.ServiceName, + OperationName: r.Query.OperationName, + Tags: r.Query.Tags, + StartTimeMin: r.Query.StartTimeMin, + StartTimeMax: r.Query.StartTimeMax, + DurationMin: r.Query.DurationMin, + DurationMax: r.Query.DurationMax, + NumTraces: int(r.Query.NumTraces), + }) + if err != nil { + return nil, err + } + return &storage_v1.FindTraceIDsResponse{ + TraceIDs: traceIDs, + }, nil +} + +func (s *grpcServer) sendSpans(spans []*model.Span, sendFn func(*storage_v1.SpansResponseChunk) error) error { + chunk := make([]model.Span, 0, len(spans)) + for i := 0; i < len(spans); i += spanBatchSize { + chunk = chunk[:0] + for j := i; j < len(spans) && j < i+spanBatchSize; j++ { + chunk = append(chunk, *spans[j]) + } + if err := sendFn(&storage_v1.SpansResponseChunk{Spans: chunk}); err != nil { + return errors.Wrap(err, "grpc plugin failed to send response") + } + } + + return nil +} diff --git a/plugin/storage/grpc/shared/grpc_server_test.go b/plugin/storage/grpc/shared/grpc_server_test.go new file mode 100644 index 00000000000..01f4f2d23e2 --- /dev/null +++ b/plugin/storage/grpc/shared/grpc_server_test.go @@ -0,0 +1,199 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + grpcMocks "github.com/jaegertracing/jaeger/proto-gen/storage_v1/mocks" + "github.com/jaegertracing/jaeger/storage/dependencystore" + dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" + "github.com/jaegertracing/jaeger/storage/spanstore" + spanStoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" +) + +type mockStoragePlugin struct { + spanReader *spanStoreMocks.Reader + spanWriter *spanStoreMocks.Writer + depsReader *dependencyStoreMocks.Reader +} + +func (plugin *mockStoragePlugin) SpanReader() spanstore.Reader { + return plugin.spanReader +} + +func (plugin *mockStoragePlugin) SpanWriter() spanstore.Writer { + return plugin.spanWriter +} + +func (plugin *mockStoragePlugin) DependencyReader() dependencystore.Reader { + return plugin.depsReader +} + +type grpcServerTest struct { + server *grpcServer + impl *mockStoragePlugin +} + +func withGRPCServer(fn func(r *grpcServerTest)) { + spanReader := new(spanStoreMocks.Reader) + spanWriter := new(spanStoreMocks.Writer) + depReader := new(dependencyStoreMocks.Reader) + + impl := &mockStoragePlugin{ + spanWriter: spanWriter, + spanReader: spanReader, + depsReader: depReader, + } + + r := &grpcServerTest{ + server: &grpcServer{ + Impl: impl, + }, + impl: impl, + } + fn(r) +} + +func TestGRPCServerGetServices(t *testing.T) { + withGRPCServer(func(r *grpcServerTest) { + r.impl.spanReader.On("GetServices", mock.Anything). + Return([]string{"service-a"}, nil) + + s, err := r.server.GetServices(context.Background(), &storage_v1.GetServicesRequest{}) + assert.NoError(t, err) + assert.Equal(t, &storage_v1.GetServicesResponse{Services: []string{"service-a"}}, s) + }) +} + +func TestGRPCServerGetOperations(t *testing.T) { + withGRPCServer(func(r *grpcServerTest) { + r.impl.spanReader.On("GetOperations", mock.Anything, "service-a"). + Return([]string{"operation-a"}, nil) + + s, err := r.server.GetOperations(context.Background(), &storage_v1.GetOperationsRequest{ + Service: "service-a", + }) + assert.NoError(t, err) + assert.Equal(t, &storage_v1.GetOperationsResponse{Operations: []string{"operation-a"}}, s) + }) +} + +func TestGRPCServerGetTrace(t *testing.T) { + withGRPCServer(func(r *grpcServerTest) { + traceSteam := new(grpcMocks.SpanReaderPlugin_GetTraceServer) + traceSteam.On("Context").Return(context.Background()) + traceSteam.On("Send", &storage_v1.SpansResponseChunk{Spans: mockTraceSpans}). + Return(nil) + + var traceSpans []*model.Span + for i := range mockTraceSpans { + traceSpans = append(traceSpans, &mockTraceSpans[i]) + } + r.impl.spanReader.On("GetTrace", mock.Anything, mockTraceID). + Return(&model.Trace{Spans: traceSpans}, nil) + + err := r.server.GetTrace(&storage_v1.GetTraceRequest{ + TraceID: mockTraceID, + }, traceSteam) + assert.NoError(t, err) + }) +} + +func TestGRPCServerFindTraces(t *testing.T) { + withGRPCServer(func(r *grpcServerTest) { + traceSteam := new(grpcMocks.SpanReaderPlugin_FindTracesServer) + traceSteam.On("Context").Return(context.Background()) + traceSteam.On("Send", &storage_v1.SpansResponseChunk{Spans: mockTracesSpans[:2]}). + Return(nil).Once() + traceSteam.On("Send", &storage_v1.SpansResponseChunk{Spans: mockTracesSpans[2:]}). + Return(nil).Once() + + var traces []*model.Trace + var traceID model.TraceID + var trace *model.Trace + for i, span := range mockTracesSpans { + if span.TraceID != traceID { + trace = &model.Trace{} + traceID = span.TraceID + traces = append(traces, trace) + } + trace.Spans = append(trace.Spans, &mockTracesSpans[i]) + } + + r.impl.spanReader.On("FindTraces", mock.Anything, &spanstore.TraceQueryParameters{}). + Return(traces, nil) + + err := r.server.FindTraces(&storage_v1.FindTracesRequest{ + Query: &storage_v1.TraceQueryParameters{}, + }, traceSteam) + assert.NoError(t, err) + }) +} + +func TestGRPCServerFindTraceIDs(t *testing.T) { + withGRPCServer(func(r *grpcServerTest) { + r.impl.spanReader.On("FindTraceIDs", mock.Anything, &spanstore.TraceQueryParameters{}). + Return([]model.TraceID{mockTraceID, mockTraceID2}, nil) + + s, err := r.server.FindTraceIDs(context.Background(), &storage_v1.FindTraceIDsRequest{ + Query: &storage_v1.TraceQueryParameters{}, + }) + assert.NoError(t, err) + assert.Equal(t, &storage_v1.FindTraceIDsResponse{TraceIDs: []model.TraceID{mockTraceID, mockTraceID2}}, s) + }) +} + +func TestGRPCServerWriteSpan(t *testing.T) { + withGRPCServer(func(r *grpcServerTest) { + r.impl.spanWriter.On("WriteSpan", &mockTraceSpans[0]). + Return(nil) + + s, err := r.server.WriteSpan(context.Background(), &storage_v1.WriteSpanRequest{ + Span: &mockTraceSpans[0], + }) + assert.NoError(t, err) + assert.Equal(t, &storage_v1.WriteSpanResponse{}, s) + }) +} + +func TestGRPCServerGetDependencies(t *testing.T) { + withGRPCServer(func(r *grpcServerTest) { + lookback := time.Duration(1 * time.Second) + end := time.Now() + deps := []model.DependencyLink{ + { + Source: "source", + Child: "child", + }, + } + r.impl.depsReader.On("GetDependencies", end, lookback). + Return(deps, nil) + + s, err := r.server.GetDependencies(context.Background(), &storage_v1.GetDependenciesRequest{ + StartTime: end.Add(-lookback), + EndTime: end, + }) + assert.NoError(t, err) + assert.Equal(t, &storage_v1.GetDependenciesResponse{Dependencies: deps}, s) + }) +} diff --git a/plugin/storage/grpc/shared/interface.go b/plugin/storage/grpc/shared/interface.go new file mode 100644 index 00000000000..24d62854151 --- /dev/null +++ b/plugin/storage/grpc/shared/interface.go @@ -0,0 +1,73 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package shared + +import ( + "context" + + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" + + "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + "github.com/jaegertracing/jaeger/storage/dependencystore" + "github.com/jaegertracing/jaeger/storage/spanstore" +) + +// StoragePluginIdentifier is the identifier that is shared by plugin and host. +const StoragePluginIdentifier = "storage_plugin" + +// Handshake is a common handshake that is shared by plugin and host. +var Handshake = plugin.HandshakeConfig{ + MagicCookieKey: "STORAGE_PLUGIN", + MagicCookieValue: "jaeger", +} + +// PluginMap is the map of plugins we can dispense. +var PluginMap = map[string]plugin.Plugin{ + StoragePluginIdentifier: &StorageGRPCPlugin{}, +} + +// StoragePlugin is the interface we're exposing as a plugin. +type StoragePlugin interface { + SpanReader() spanstore.Reader + SpanWriter() spanstore.Writer + DependencyReader() dependencystore.Reader +} + +// StorageGRPCPlugin is the implementation of plugin.GRPCPlugin so we can serve/consume this. +type StorageGRPCPlugin struct { + plugin.Plugin + // Concrete implementation, written in Go. This is only used for plugins + // that are written in Go. + Impl StoragePlugin +} + +// GRPCServer is used by go-plugin to create a grpc plugin server +func (p *StorageGRPCPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + server := &grpcServer{Impl: p.Impl} + storage_v1.RegisterSpanReaderPluginServer(s, server) + storage_v1.RegisterSpanWriterPluginServer(s, server) + storage_v1.RegisterDependenciesReaderPluginServer(s, server) + return nil +} + +// GRPCClient is used by go-plugin to create a grpc plugin client +func (*StorageGRPCPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) { + return &grpcClient{ + readerClient: storage_v1.NewSpanReaderPluginClient(c), + writerClient: storage_v1.NewSpanWriterPluginClient(c), + depsReaderClient: storage_v1.NewDependenciesReaderPluginClient(c), + }, nil +} diff --git a/plugin/storage/integration/grpc_test.go b/plugin/storage/integration/grpc_test.go new file mode 100644 index 00000000000..ea0f2f7999a --- /dev/null +++ b/plugin/storage/integration/grpc_test.go @@ -0,0 +1,83 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "os" + "testing" + + "github.com/uber/jaeger-lib/metrics" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/jaegertracing/jaeger/pkg/config" + "github.com/jaegertracing/jaeger/pkg/testutils" + "github.com/jaegertracing/jaeger/plugin/storage/grpc" +) + +type GRPCStorageIntegrationTestSuite struct { + StorageIntegration + logger *zap.Logger +} + +func (s *GRPCStorageIntegrationTestSuite) initialize() error { + s.logger, _ = testutils.NewLogger() + gopath := os.Getenv("GOPATH") + path := gopath + "/src/github.com/jaegertracing/jaeger/examples/memstore-plugin/memstore-plugin" + + f := grpc.NewFactory() + v, command := config.Viperize(f.AddFlags) + err := command.ParseFlags([]string{ + "--grpc-storage-plugin.binary", + path, + }) + if err != nil { + return err + } + f.InitFromViper(v) + if err := f.Initialize(metrics.NullFactory, s.logger); err != nil { + return err + } + + if s.SpanWriter, err = f.CreateSpanWriter(); err != nil { + return err + } + if s.SpanReader, err = f.CreateSpanReader(); err != nil { + return err + } + + // TODO DependencyWriter is not implemented in grpc store + + s.Refresh = s.refresh + s.CleanUp = s.cleanUp + return nil +} + +func (s *GRPCStorageIntegrationTestSuite) refresh() error { + return nil +} + +func (s *GRPCStorageIntegrationTestSuite) cleanUp() error { + return s.initialize() +} + +func TestGRPCStorage(t *testing.T) { + if os.Getenv("STORAGE") != "grpc-plugin" { + t.Skip("Integration test against grpc skipped; set STORAGE env var to grpc-plugin to run this") + } + s := &GRPCStorageIntegrationTestSuite{} + require.NoError(t, s.initialize()) + s.IntegrationTestAll(t) +} diff --git a/proto-gen/storage_v1/mocks/DependenciesReaderPluginClient.go b/proto-gen/storage_v1/mocks/DependenciesReaderPluginClient.go new file mode 100644 index 00000000000..ba05531c915 --- /dev/null +++ b/proto-gen/storage_v1/mocks/DependenciesReaderPluginClient.go @@ -0,0 +1,43 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import context "context" +import grpc "google.golang.org/grpc" +import mock "github.com/stretchr/testify/mock" +import storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + +// DependenciesReaderPluginClient is an autogenerated mock type for the DependenciesReaderPluginClient type +type DependenciesReaderPluginClient struct { + mock.Mock +} + +// GetDependencies provides a mock function with given fields: ctx, in, opts +func (_m *DependenciesReaderPluginClient) GetDependencies(ctx context.Context, in *storage_v1.GetDependenciesRequest, opts ...grpc.CallOption) (*storage_v1.GetDependenciesResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *storage_v1.GetDependenciesResponse + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.GetDependenciesRequest, ...grpc.CallOption) *storage_v1.GetDependenciesResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.GetDependenciesResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.GetDependenciesRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/proto-gen/storage_v1/mocks/DependenciesReaderPluginServer.go b/proto-gen/storage_v1/mocks/DependenciesReaderPluginServer.go new file mode 100644 index 00000000000..7d126aeeada --- /dev/null +++ b/proto-gen/storage_v1/mocks/DependenciesReaderPluginServer.go @@ -0,0 +1,35 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import context "context" +import mock "github.com/stretchr/testify/mock" +import storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + +// DependenciesReaderPluginServer is an autogenerated mock type for the DependenciesReaderPluginServer type +type DependenciesReaderPluginServer struct { + mock.Mock +} + +// GetDependencies provides a mock function with given fields: _a0, _a1 +func (_m *DependenciesReaderPluginServer) GetDependencies(_a0 context.Context, _a1 *storage_v1.GetDependenciesRequest) (*storage_v1.GetDependenciesResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *storage_v1.GetDependenciesResponse + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.GetDependenciesRequest) *storage_v1.GetDependenciesResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.GetDependenciesResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.GetDependenciesRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/proto-gen/storage_v1/mocks/SpanReaderPluginClient.go b/proto-gen/storage_v1/mocks/SpanReaderPluginClient.go new file mode 100644 index 00000000000..2e096d1ea1d --- /dev/null +++ b/proto-gen/storage_v1/mocks/SpanReaderPluginClient.go @@ -0,0 +1,163 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import context "context" +import grpc "google.golang.org/grpc" +import mock "github.com/stretchr/testify/mock" +import storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + +// SpanReaderPluginClient is an autogenerated mock type for the SpanReaderPluginClient type +type SpanReaderPluginClient struct { + mock.Mock +} + +// FindTraceIDs provides a mock function with given fields: ctx, in, opts +func (_m *SpanReaderPluginClient) FindTraceIDs(ctx context.Context, in *storage_v1.FindTraceIDsRequest, opts ...grpc.CallOption) (*storage_v1.FindTraceIDsResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *storage_v1.FindTraceIDsResponse + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.FindTraceIDsRequest, ...grpc.CallOption) *storage_v1.FindTraceIDsResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.FindTraceIDsResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.FindTraceIDsRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FindTraces provides a mock function with given fields: ctx, in, opts +func (_m *SpanReaderPluginClient) FindTraces(ctx context.Context, in *storage_v1.FindTracesRequest, opts ...grpc.CallOption) (storage_v1.SpanReaderPlugin_FindTracesClient, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 storage_v1.SpanReaderPlugin_FindTracesClient + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.FindTracesRequest, ...grpc.CallOption) storage_v1.SpanReaderPlugin_FindTracesClient); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage_v1.SpanReaderPlugin_FindTracesClient) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.FindTracesRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetOperations provides a mock function with given fields: ctx, in, opts +func (_m *SpanReaderPluginClient) GetOperations(ctx context.Context, in *storage_v1.GetOperationsRequest, opts ...grpc.CallOption) (*storage_v1.GetOperationsResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *storage_v1.GetOperationsResponse + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.GetOperationsRequest, ...grpc.CallOption) *storage_v1.GetOperationsResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.GetOperationsResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.GetOperationsRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetServices provides a mock function with given fields: ctx, in, opts +func (_m *SpanReaderPluginClient) GetServices(ctx context.Context, in *storage_v1.GetServicesRequest, opts ...grpc.CallOption) (*storage_v1.GetServicesResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *storage_v1.GetServicesResponse + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.GetServicesRequest, ...grpc.CallOption) *storage_v1.GetServicesResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.GetServicesResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.GetServicesRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTrace provides a mock function with given fields: ctx, in, opts +func (_m *SpanReaderPluginClient) GetTrace(ctx context.Context, in *storage_v1.GetTraceRequest, opts ...grpc.CallOption) (storage_v1.SpanReaderPlugin_GetTraceClient, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 storage_v1.SpanReaderPlugin_GetTraceClient + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.GetTraceRequest, ...grpc.CallOption) storage_v1.SpanReaderPlugin_GetTraceClient); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(storage_v1.SpanReaderPlugin_GetTraceClient) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.GetTraceRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/proto-gen/storage_v1/mocks/SpanReaderPluginServer.go b/proto-gen/storage_v1/mocks/SpanReaderPluginServer.go new file mode 100644 index 00000000000..622b64c0657 --- /dev/null +++ b/proto-gen/storage_v1/mocks/SpanReaderPluginServer.go @@ -0,0 +1,109 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import context "context" +import mock "github.com/stretchr/testify/mock" +import storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + +// SpanReaderPluginServer is an autogenerated mock type for the SpanReaderPluginServer type +type SpanReaderPluginServer struct { + mock.Mock +} + +// FindTraceIDs provides a mock function with given fields: _a0, _a1 +func (_m *SpanReaderPluginServer) FindTraceIDs(_a0 context.Context, _a1 *storage_v1.FindTraceIDsRequest) (*storage_v1.FindTraceIDsResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *storage_v1.FindTraceIDsResponse + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.FindTraceIDsRequest) *storage_v1.FindTraceIDsResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.FindTraceIDsResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.FindTraceIDsRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// FindTraces provides a mock function with given fields: _a0, _a1 +func (_m *SpanReaderPluginServer) FindTraces(_a0 *storage_v1.FindTracesRequest, _a1 storage_v1.SpanReaderPlugin_FindTracesServer) error { + ret := _m.Called(_a0, _a1) + + var r0 error + if rf, ok := ret.Get(0).(func(*storage_v1.FindTracesRequest, storage_v1.SpanReaderPlugin_FindTracesServer) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetOperations provides a mock function with given fields: _a0, _a1 +func (_m *SpanReaderPluginServer) GetOperations(_a0 context.Context, _a1 *storage_v1.GetOperationsRequest) (*storage_v1.GetOperationsResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *storage_v1.GetOperationsResponse + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.GetOperationsRequest) *storage_v1.GetOperationsResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.GetOperationsResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.GetOperationsRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetServices provides a mock function with given fields: _a0, _a1 +func (_m *SpanReaderPluginServer) GetServices(_a0 context.Context, _a1 *storage_v1.GetServicesRequest) (*storage_v1.GetServicesResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *storage_v1.GetServicesResponse + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.GetServicesRequest) *storage_v1.GetServicesResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.GetServicesResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.GetServicesRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// GetTrace provides a mock function with given fields: _a0, _a1 +func (_m *SpanReaderPluginServer) GetTrace(_a0 *storage_v1.GetTraceRequest, _a1 storage_v1.SpanReaderPlugin_GetTraceServer) error { + ret := _m.Called(_a0, _a1) + + var r0 error + if rf, ok := ret.Get(0).(func(*storage_v1.GetTraceRequest, storage_v1.SpanReaderPlugin_GetTraceServer) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/proto-gen/storage_v1/mocks/SpanReaderPlugin_FindTracesClient.go b/proto-gen/storage_v1/mocks/SpanReaderPlugin_FindTracesClient.go new file mode 100644 index 00000000000..b0a6007f75f --- /dev/null +++ b/proto-gen/storage_v1/mocks/SpanReaderPlugin_FindTracesClient.go @@ -0,0 +1,133 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import context "context" +import metadata "google.golang.org/grpc/metadata" +import mock "github.com/stretchr/testify/mock" +import storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + +// SpanReaderPlugin_FindTracesClient is an autogenerated mock type for the SpanReaderPlugin_FindTracesClient type +type SpanReaderPlugin_FindTracesClient struct { + mock.Mock +} + +// CloseSend provides a mock function with given fields: +func (_m *SpanReaderPlugin_FindTracesClient) CloseSend() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Context provides a mock function with given fields: +func (_m *SpanReaderPlugin_FindTracesClient) Context() context.Context { + ret := _m.Called() + + var r0 context.Context + if rf, ok := ret.Get(0).(func() context.Context); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + return r0 +} + +// Header provides a mock function with given fields: +func (_m *SpanReaderPlugin_FindTracesClient) Header() (metadata.MD, error) { + ret := _m.Called() + + var r0 metadata.MD + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Recv provides a mock function with given fields: +func (_m *SpanReaderPlugin_FindTracesClient) Recv() (*storage_v1.SpansResponseChunk, error) { + ret := _m.Called() + + var r0 *storage_v1.SpansResponseChunk + if rf, ok := ret.Get(0).(func() *storage_v1.SpansResponseChunk); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.SpansResponseChunk) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RecvMsg provides a mock function with given fields: m +func (_m *SpanReaderPlugin_FindTracesClient) RecvMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendMsg provides a mock function with given fields: m +func (_m *SpanReaderPlugin_FindTracesClient) SendMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Trailer provides a mock function with given fields: +func (_m *SpanReaderPlugin_FindTracesClient) Trailer() metadata.MD { + ret := _m.Called() + + var r0 metadata.MD + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + return r0 +} diff --git a/proto-gen/storage_v1/mocks/SpanReaderPlugin_FindTracesServer.go b/proto-gen/storage_v1/mocks/SpanReaderPlugin_FindTracesServer.go new file mode 100644 index 00000000000..a8a053ebbb9 --- /dev/null +++ b/proto-gen/storage_v1/mocks/SpanReaderPlugin_FindTracesServer.go @@ -0,0 +1,104 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import context "context" +import metadata "google.golang.org/grpc/metadata" +import mock "github.com/stretchr/testify/mock" +import storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + +// SpanReaderPlugin_FindTracesServer is an autogenerated mock type for the SpanReaderPlugin_FindTracesServer type +type SpanReaderPlugin_FindTracesServer struct { + mock.Mock +} + +// Context provides a mock function with given fields: +func (_m *SpanReaderPlugin_FindTracesServer) Context() context.Context { + ret := _m.Called() + + var r0 context.Context + if rf, ok := ret.Get(0).(func() context.Context); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + return r0 +} + +// RecvMsg provides a mock function with given fields: m +func (_m *SpanReaderPlugin_FindTracesServer) RecvMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Send provides a mock function with given fields: _a0 +func (_m *SpanReaderPlugin_FindTracesServer) Send(_a0 *storage_v1.SpansResponseChunk) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*storage_v1.SpansResponseChunk) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendHeader provides a mock function with given fields: _a0 +func (_m *SpanReaderPlugin_FindTracesServer) SendHeader(_a0 metadata.MD) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(metadata.MD) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendMsg provides a mock function with given fields: m +func (_m *SpanReaderPlugin_FindTracesServer) SendMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetHeader provides a mock function with given fields: _a0 +func (_m *SpanReaderPlugin_FindTracesServer) SetHeader(_a0 metadata.MD) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(metadata.MD) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetTrailer provides a mock function with given fields: _a0 +func (_m *SpanReaderPlugin_FindTracesServer) SetTrailer(_a0 metadata.MD) { + _m.Called(_a0) +} diff --git a/proto-gen/storage_v1/mocks/SpanReaderPlugin_GetTraceClient.go b/proto-gen/storage_v1/mocks/SpanReaderPlugin_GetTraceClient.go new file mode 100644 index 00000000000..fca0337a316 --- /dev/null +++ b/proto-gen/storage_v1/mocks/SpanReaderPlugin_GetTraceClient.go @@ -0,0 +1,133 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import context "context" +import metadata "google.golang.org/grpc/metadata" +import mock "github.com/stretchr/testify/mock" +import storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + +// SpanReaderPlugin_GetTraceClient is an autogenerated mock type for the SpanReaderPlugin_GetTraceClient type +type SpanReaderPlugin_GetTraceClient struct { + mock.Mock +} + +// CloseSend provides a mock function with given fields: +func (_m *SpanReaderPlugin_GetTraceClient) CloseSend() error { + ret := _m.Called() + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Context provides a mock function with given fields: +func (_m *SpanReaderPlugin_GetTraceClient) Context() context.Context { + ret := _m.Called() + + var r0 context.Context + if rf, ok := ret.Get(0).(func() context.Context); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + return r0 +} + +// Header provides a mock function with given fields: +func (_m *SpanReaderPlugin_GetTraceClient) Header() (metadata.MD, error) { + ret := _m.Called() + + var r0 metadata.MD + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Recv provides a mock function with given fields: +func (_m *SpanReaderPlugin_GetTraceClient) Recv() (*storage_v1.SpansResponseChunk, error) { + ret := _m.Called() + + var r0 *storage_v1.SpansResponseChunk + if rf, ok := ret.Get(0).(func() *storage_v1.SpansResponseChunk); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.SpansResponseChunk) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// RecvMsg provides a mock function with given fields: m +func (_m *SpanReaderPlugin_GetTraceClient) RecvMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendMsg provides a mock function with given fields: m +func (_m *SpanReaderPlugin_GetTraceClient) SendMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Trailer provides a mock function with given fields: +func (_m *SpanReaderPlugin_GetTraceClient) Trailer() metadata.MD { + ret := _m.Called() + + var r0 metadata.MD + if rf, ok := ret.Get(0).(func() metadata.MD); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(metadata.MD) + } + } + + return r0 +} diff --git a/proto-gen/storage_v1/mocks/SpanReaderPlugin_GetTraceServer.go b/proto-gen/storage_v1/mocks/SpanReaderPlugin_GetTraceServer.go new file mode 100644 index 00000000000..a28a1b073e7 --- /dev/null +++ b/proto-gen/storage_v1/mocks/SpanReaderPlugin_GetTraceServer.go @@ -0,0 +1,104 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import context "context" +import metadata "google.golang.org/grpc/metadata" +import mock "github.com/stretchr/testify/mock" +import storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + +// SpanReaderPlugin_GetTraceServer is an autogenerated mock type for the SpanReaderPlugin_GetTraceServer type +type SpanReaderPlugin_GetTraceServer struct { + mock.Mock +} + +// Context provides a mock function with given fields: +func (_m *SpanReaderPlugin_GetTraceServer) Context() context.Context { + ret := _m.Called() + + var r0 context.Context + if rf, ok := ret.Get(0).(func() context.Context); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(context.Context) + } + } + + return r0 +} + +// RecvMsg provides a mock function with given fields: m +func (_m *SpanReaderPlugin_GetTraceServer) RecvMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Send provides a mock function with given fields: _a0 +func (_m *SpanReaderPlugin_GetTraceServer) Send(_a0 *storage_v1.SpansResponseChunk) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*storage_v1.SpansResponseChunk) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendHeader provides a mock function with given fields: _a0 +func (_m *SpanReaderPlugin_GetTraceServer) SendHeader(_a0 metadata.MD) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(metadata.MD) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SendMsg provides a mock function with given fields: m +func (_m *SpanReaderPlugin_GetTraceServer) SendMsg(m interface{}) error { + ret := _m.Called(m) + + var r0 error + if rf, ok := ret.Get(0).(func(interface{}) error); ok { + r0 = rf(m) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetHeader provides a mock function with given fields: _a0 +func (_m *SpanReaderPlugin_GetTraceServer) SetHeader(_a0 metadata.MD) error { + ret := _m.Called(_a0) + + var r0 error + if rf, ok := ret.Get(0).(func(metadata.MD) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// SetTrailer provides a mock function with given fields: _a0 +func (_m *SpanReaderPlugin_GetTraceServer) SetTrailer(_a0 metadata.MD) { + _m.Called(_a0) +} diff --git a/proto-gen/storage_v1/mocks/SpanWriterPluginClient.go b/proto-gen/storage_v1/mocks/SpanWriterPluginClient.go new file mode 100644 index 00000000000..fdc4e3c5fe2 --- /dev/null +++ b/proto-gen/storage_v1/mocks/SpanWriterPluginClient.go @@ -0,0 +1,43 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import context "context" +import grpc "google.golang.org/grpc" +import mock "github.com/stretchr/testify/mock" +import storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + +// SpanWriterPluginClient is an autogenerated mock type for the SpanWriterPluginClient type +type SpanWriterPluginClient struct { + mock.Mock +} + +// WriteSpan provides a mock function with given fields: ctx, in, opts +func (_m *SpanWriterPluginClient) WriteSpan(ctx context.Context, in *storage_v1.WriteSpanRequest, opts ...grpc.CallOption) (*storage_v1.WriteSpanResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *storage_v1.WriteSpanResponse + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.WriteSpanRequest, ...grpc.CallOption) *storage_v1.WriteSpanResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.WriteSpanResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.WriteSpanRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/proto-gen/storage_v1/mocks/SpanWriterPluginServer.go b/proto-gen/storage_v1/mocks/SpanWriterPluginServer.go new file mode 100644 index 00000000000..f9c98750de5 --- /dev/null +++ b/proto-gen/storage_v1/mocks/SpanWriterPluginServer.go @@ -0,0 +1,35 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import context "context" +import mock "github.com/stretchr/testify/mock" +import storage_v1 "github.com/jaegertracing/jaeger/proto-gen/storage_v1" + +// SpanWriterPluginServer is an autogenerated mock type for the SpanWriterPluginServer type +type SpanWriterPluginServer struct { + mock.Mock +} + +// WriteSpan provides a mock function with given fields: _a0, _a1 +func (_m *SpanWriterPluginServer) WriteSpan(_a0 context.Context, _a1 *storage_v1.WriteSpanRequest) (*storage_v1.WriteSpanResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *storage_v1.WriteSpanResponse + if rf, ok := ret.Get(0).(func(context.Context, *storage_v1.WriteSpanRequest) *storage_v1.WriteSpanResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*storage_v1.WriteSpanResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *storage_v1.WriteSpanRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +}