From 75870c6abccf3b4a22189cbbdc4dcc9e4748bd4e Mon Sep 17 00:00:00 2001 From: Jun Guo Date: Tue, 19 Nov 2019 15:03:51 -0500 Subject: [PATCH] Create new cassandra table "operation_names_v2" with "spanKind" column for operation name index - add migration script - read from the latest table if available, otherwise fail back to previous table Signed-off-by: Jun Guo --- plugin/storage/cassandra/factory_test.go | 2 +- .../cassandra/schema/migration/V002toV003.sh | 100 +++++++++ plugin/storage/cassandra/schema/v003.cql.tmpl | 204 ++++++++++++++++++ .../cassandra/spanstore/operation_names.go | 149 +++++++++++-- .../spanstore/operation_names_test.go | 104 ++++++--- .../cassandra/spanstore/reader_test.go | 3 + .../cassandra/spanstore/writer_test.go | 4 + 7 files changed, 511 insertions(+), 55 deletions(-) create mode 100644 plugin/storage/cassandra/schema/migration/V002toV003.sh create mode 100644 plugin/storage/cassandra/schema/v003.cql.tmpl diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 6671203b1878..2b739a71b3a5 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -91,7 +91,7 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateArchiveSpanWriter() assert.EqualError(t, err, "archive storage not configured") - f.archiveConfig = &mockSessionBuilder{} + f.archiveConfig = newMockSessionBuilder(session, nil) assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) _, err = f.CreateArchiveSpanReader() diff --git a/plugin/storage/cassandra/schema/migration/V002toV003.sh b/plugin/storage/cassandra/schema/migration/V002toV003.sh new file mode 100644 index 000000000000..9d53b06ca079 --- /dev/null +++ b/plugin/storage/cassandra/schema/migration/V002toV003.sh @@ -0,0 +1,100 @@ +#!/usr/bin/env bash + +# Create a new operation_names_v2 table and copy all data from operation_names table +# Sample usage: KEYSPACE=jaeger_v1_test TIMEOUT=1000 ./plugin/storage/cassandra/schema/migration/v002tov003.sh + +set -euo pipefail + +function usage { + >&2 echo "Error: $1" + >&2 echo "" + >&2 echo "Usage: KEYSPACE={keyspace} TTL={ttl} $0" + >&2 echo "" + >&2 echo "The following parameters can be set via environment:" + >&2 echo " KEYSPACE - keyspace" + >&2 echo "" + exit 1 +} + +confirm() { + read -r -p "${1:-Continue? [y/N]} " response + case "$response" in + [yY][eE][sS]|[yY]) + true + ;; + *) + exit 1 + ;; + esac +} + +if [[ ${KEYSPACE} == "" ]]; then + usage "missing KEYSPACE parameter" +fi + +if [[ ${KEYSPACE} =~ [^a-zA-Z0-9_] ]]; then + usage "invalid characters in KEYSPACE=$KEYSPACE parameter, please use letters, digits or underscores" +fi + +keyspace=${KEYSPACE} +old_table=operation_names +new_table=operation_names_v2 +cqlsh_cmd=cqlsh + +row_count=$(${cqlsh_cmd} -e "select count(*) from $keyspace.$old_table;"|head -4|tail -1| tr -d ' ') + +echo "About to copy $row_count rows to new table..." + +confirm + +${cqlsh_cmd} -e "COPY $keyspace.$old_table (service_name, operation_name) to '$old_table.csv';" + +if [[ ! -f ${old_table}.csv ]]; then + echo "Could not find $old_table.csv. Backup from cassandra was probably not successful" + exit 1 +fi + +csv_rows=$(wc -l ${old_table}.csv | tr -dc '0-9') + +if [[ ${row_count} -ne ${csv_rows} ]]; then + echo "Number of rows: $csv_rows in file is not equal to number of rows: $row_count in cassandra" + exit 1 +fi + +echo "Generating data for new table..." +while IFS="," read service_name operation_name; do + echo "$service_name,,$operation_name" +done < ${old_table}.csv > ${new_table}.csv + +ttl=$(${cqlsh_cmd} -e "select default_time_to_live from system_schema.tables WHERE keyspace_name='$keyspace' AND table_name='$old_table';"|head -4|tail -1|tr -d ' ') + +echo "Creating new table $new_table with ttl: $ttl" + +${cqlsh_cmd} -e "CREATE TABLE IF NOT EXISTS $keyspace.$new_table ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = $ttl + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800;" + +echo "Import data to new table: $keyspace.$new_table from $new_table.csv" + +# empty string will be inserted as empty string instead of null +${cqlsh_cmd} -e "COPY $keyspace.$new_table (service_name, span_kind, operation_name) + FROM '$new_table.csv' + WITH NULL='NIL';" + +echo "Data from old table are successfully imported to new table!" + +echo "Before finish, do you want to delete old table: $keyspace.$old_table?" +confirm +${cqlsh_cmd} -e "DROP TABLE IF EXISTS $keyspace.$old_table;" \ No newline at end of file diff --git a/plugin/storage/cassandra/schema/v003.cql.tmpl b/plugin/storage/cassandra/schema/v003.cql.tmpl new file mode 100644 index 000000000000..b1b664d3dd0b --- /dev/null +++ b/plugin/storage/cassandra/schema/v003.cql.tmpl @@ -0,0 +1,204 @@ +-- +-- Creates Cassandra keyspace with tables for traces and dependencies. +-- +-- Required parameters: +-- +-- keyspace +-- name of the keyspace +-- replication +-- replication strategy for the keyspace, such as +-- for prod environments +-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } +-- for test environments +-- {'class': 'SimpleStrategy', 'replication_factor': '1'} +-- trace_ttl +-- default time to live for trace data, in seconds +-- dependencies_ttl +-- default time to live for dependencies data, in seconds (0 for no TTL) +-- +-- Non-configurable settings: +-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ +-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html + +CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; + +CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.log ( + ts bigint, + fields list>, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.process ( + service_name text, + tags list>, +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, + duration bigint, + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names_v2 ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( + service_name text, + bucket int, + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( + service_name text, // service name + operation_name text, // operation name, or blank for queries without span name + bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour + duration bigint, // span duration, in microseconds + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( + parent text, + child text, + call_count bigint, + source text, +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${dependencies_ttl}; \ No newline at end of file diff --git a/plugin/storage/cassandra/spanstore/operation_names.go b/plugin/storage/cassandra/spanstore/operation_names.go index de73cf4347e6..34282e6be5d9 100644 --- a/plugin/storage/cassandra/spanstore/operation_names.go +++ b/plugin/storage/cassandra/spanstore/operation_names.go @@ -16,6 +16,7 @@ package spanstore import ( + "fmt" "time" "github.com/pkg/errors" @@ -25,23 +26,51 @@ import ( "github.com/jaegertracing/jaeger/pkg/cache" "github.com/jaegertracing/jaeger/pkg/cassandra" casMetrics "github.com/jaegertracing/jaeger/pkg/cassandra/metrics" + "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( - insertOperationName = `INSERT INTO operation_names(service_name, operation_name) VALUES (?, ?)` - queryOperationNames = `SELECT operation_name FROM operation_names WHERE service_name = ?` + // LatestVersion latest version of operation_names table schema, increase the version if your table schema changes require code change + LatestVersion = 1 + // TableQueryStmt the query statement used to check if a table exists or not + TableQueryStmt = "SELECT * from ? limit 1" ) +type schemaMeta struct { + TableName string + InsertStmt string + QueryByKindStmt string + QueryStmt string +} + +var schemas = []schemaMeta{ + { + TableName: "operation_names", + InsertStmt: "INSERT INTO %s(service_name, operation_name) VALUES (?, ?)", + QueryByKindStmt: "SELECT operation_name FROM %s WHERE service_name = ?", + QueryStmt: "SELECT operation_name FROM %s WHERE service_name = ?", + }, + { + TableName: "operation_names_v2", + InsertStmt: "INSERT INTO %s(service_name, span_kind, operation_name) VALUES (?, ?, ?)", + QueryByKindStmt: "SELECT span_kind, operation_name FROM %s WHERE service_name = ? AND span_kind = ?", + QueryStmt: "SELECT span_kind, operation_name FROM %s WHERE service_name = ?", + }, +} + // OperationNamesStorage stores known operation names by service. type OperationNamesStorage struct { // CQL statements are public so that Cassandra2 storage can override them - InsertStmt string - QueryStmt string - session cassandra.Session - writeCacheTTL time.Duration - metrics *casMetrics.Table - operationNames cache.Cache - logger *zap.Logger + SchemaVersion int + TableName string + InsertStmt string + QueryStmt string + QueryByKindStmt string + session cassandra.Session + writeCacheTTL time.Duration + metrics *casMetrics.Table + operationNames cache.Cache + logger *zap.Logger } // NewOperationNamesStorage returns a new OperationNamesStorage @@ -51,13 +80,23 @@ func NewOperationNamesStorage( metricsFactory metrics.Factory, logger *zap.Logger, ) *OperationNamesStorage { + + schemaVersion := LatestVersion + + for schemaVersion > 0 && !tableExist(session, schemas[schemaVersion].TableName) { + schemaVersion = schemaVersion - 1 + } + return &OperationNamesStorage{ - session: session, - InsertStmt: insertOperationName, - QueryStmt: queryOperationNames, - metrics: casMetrics.NewTable(metricsFactory, "operation_names"), - writeCacheTTL: writeCacheTTL, - logger: logger, + session: session, + TableName: schemas[schemaVersion].TableName, + SchemaVersion: schemaVersion, + InsertStmt: fmt.Sprintf(schemas[schemaVersion].InsertStmt, schemas[schemaVersion].TableName), + QueryByKindStmt: fmt.Sprintf(schemas[schemaVersion].QueryByKindStmt, schemas[schemaVersion].TableName), + QueryStmt: fmt.Sprintf(schemas[schemaVersion].QueryStmt, schemas[schemaVersion].TableName), + metrics: casMetrics.NewTable(metricsFactory, schemas[schemaVersion].TableName), + writeCacheTTL: writeCacheTTL, + logger: logger, operationNames: cache.NewLRUWithOptions( 100000, &cache.Options{ @@ -67,12 +106,27 @@ func NewOperationNamesStorage( } } +func tableExist(session cassandra.Session, tableName string) bool { + query := session.Query(TableQueryStmt, tableName) + err := query.Exec() + return err != nil +} + // Write saves Operation and Service name tuples func (s *OperationNamesStorage) Write(serviceName string, operationName string) error { var err error + //TODO: take spanKind from args + spanKind := "" query := s.session.Query(s.InsertStmt) - if inCache := checkWriteCache(serviceName+"|"+operationName, s.operationNames, s.writeCacheTTL); !inCache { - q := query.Bind(serviceName, operationName) + if inCache := checkWriteCache(serviceName+"|"+spanKind+"|"+operationName, s.operationNames, s.writeCacheTTL); !inCache { + var q cassandra.Query + switch s.SchemaVersion { + case 1: + q = query.Bind(serviceName, spanKind, operationName) + case 0: + q = query.Bind(serviceName, operationName) + } + err2 := s.metrics.Exec(q, s.logger) if err2 != nil { err = err2 @@ -83,16 +137,73 @@ func (s *OperationNamesStorage) Write(serviceName string, operationName string) // GetOperations returns all operations for a specific service traced by Jaeger func (s *OperationNamesStorage) GetOperations(service string) ([]string, error) { + var operations []*spanstore.Operation + var err error + + switch s.SchemaVersion { + case 1: + operations, err = getOperationsV1(s, &spanstore.OperationQueryParameters{ + ServiceName: service, + }) + case 0: + operations, err = getOperationsV0(s, service) + } + + if err != nil { + return nil, err + } + operationNames := make([]string, len(operations)) + for idx, operation := range operations { + operationNames[idx] = operation.Name + } + return operationNames, err +} + +func getOperationsV0(s *OperationNamesStorage, service string) ([]*spanstore.Operation, error) { iter := s.session.Query(s.QueryStmt, service).Iter() var operation string - var operations []string + var operationNames []string for iter.Scan(&operation) { - operations = append(operations, operation) + operationNames = append(operationNames, operation) } if err := iter.Close(); err != nil { err = errors.Wrap(err, "Error reading operation_names from storage") return nil, err } + + operations := make([]*spanstore.Operation, len(operationNames)) + for idx, name := range operationNames { + operations[idx] = &spanstore.Operation{ + Name: name, + } + } + return operations, nil +} + +func getOperationsV1(s *OperationNamesStorage, query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error) { + var casQuery cassandra.Query + if query.SpanKind == "" { + // Get operations for all spanKind + casQuery = s.session.Query(s.QueryStmt, query.ServiceName) + } else { + // Get operations for given spanKind + casQuery = s.session.Query(s.QueryByKindStmt, query.ServiceName, query.SpanKind) + } + iter := casQuery.Iter() + + var operationName string + var spanKind string + var operations []*spanstore.Operation + for iter.Scan(&spanKind, &operationName) { + operations = append(operations, &spanstore.Operation{ + Name: operationName, + SpanKind: spanKind, + }) + } + if err := iter.Close(); err != nil { + err = errors.Wrap(err, fmt.Sprintf("Error reading %s from storage", s.TableName)) + return nil, err + } return operations, nil } diff --git a/plugin/storage/cassandra/spanstore/operation_names_test.go b/plugin/storage/cassandra/spanstore/operation_names_test.go index ca8357542f80..65c6de73b124 100644 --- a/plugin/storage/cassandra/spanstore/operation_names_test.go +++ b/plugin/storage/cassandra/spanstore/operation_names_test.go @@ -30,6 +30,12 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" ) +type test struct { + ttl time.Duration + schemaVersion int + expErr error +} + type operationNameStorageTest struct { session *mocks.Session writeCacheTTL time.Duration @@ -39,10 +45,18 @@ type operationNameStorageTest struct { storage *OperationNamesStorage } -func withOperationNamesStorage(writeCacheTTL time.Duration, fn func(s *operationNameStorageTest)) { +func withOperationNamesStorage(writeCacheTTL time.Duration, schemaVersion int, fn func(s *operationNameStorageTest)) { session := &mocks.Session{} logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) + query := &mocks.Query{} + session.On("Query", TableQueryStmt, mock.Anything).Return(query) + if schemaVersion == LatestVersion { + query.On("Exec").Return(errors.New("new table does not exist")) + } else { + query.On("Exec").Return(nil) + } + s := &operationNameStorageTest{ session: session, writeCacheTTL: writeCacheTTL, @@ -55,38 +69,49 @@ func withOperationNamesStorage(writeCacheTTL time.Duration, fn func(s *operation } func TestOperationNamesStorageWrite(t *testing.T) { - for _, ttl := range []time.Duration{0, time.Minute} { - writeCacheTTL := ttl // capture loop var - t.Run(fmt.Sprintf("writeCacheTTL=%v", writeCacheTTL), func(t *testing.T) { - withOperationNamesStorage(writeCacheTTL, func(s *operationNameStorageTest) { + for _, test := range []test{ + {0, 0, nil}, + {time.Minute, 0, nil}, + {0, 1, nil}, + {time.Minute, 1, nil}, + } { + writeCacheTTL := test.ttl // capture loop var + t.Run(fmt.Sprintf("test %#v", test), func(t *testing.T) { + withOperationNamesStorage(writeCacheTTL, test.schemaVersion, func(s *operationNameStorageTest) { var execError = errors.New("exec error") query := &mocks.Query{} query1 := &mocks.Query{} query2 := &mocks.Query{} - query.On("Bind", []interface{}{"service-a", "Operation-b"}).Return(query1) - query.On("Bind", []interface{}{"service-c", "operation-d"}).Return(query2) + + if test.schemaVersion == 0 { + query.On("Bind", []interface{}{"service-a", "Operation-b"}).Return(query1) + query.On("Bind", []interface{}{"service-c", "operation-d"}).Return(query2) + } else { + query.On("Bind", []interface{}{"service-a", "", "Operation-b"}).Return(query1) + query.On("Bind", []interface{}{"service-c", "", "operation-d"}).Return(query2) + } + query1.On("Exec").Return(nil) query2.On("Exec").Return(execError) - query2.On("String").Return("select from operation_names") + query2.On("String").Return("select from " + schemas[test.schemaVersion].TableName) - var emptyArgs []interface{} - s.session.On("Query", mock.AnythingOfType("string"), emptyArgs).Return(query) + s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) err := s.storage.Write("service-a", "Operation-b") assert.NoError(t, err) err = s.storage.Write("service-c", "operation-d") - assert.EqualError(t, err, "failed to Exec query 'select from operation_names': exec error") + assert.EqualError(t, err, "failed to Exec query 'select from "+schemas[test.schemaVersion].TableName+"': exec error") assert.Equal(t, map[string]string{ "level": "error", "msg": "Failed to exec query", - "query": "select from operation_names", + "query": "select from " + schemas[test.schemaVersion].TableName, "error": "exec error", }, s.logBuffer.JSONLine(0)) counts, _ := s.metricsFactory.Snapshot() assert.Equal(t, map[string]int64{ - "attempts|table=operation_names": 2, "inserts|table=operation_names": 1, "errors|table=operation_names": 1, + "attempts|table=" + schemas[test.schemaVersion].TableName: 2, "inserts|table=" + schemas[test.schemaVersion].TableName: 1, "errors|table=" + schemas[test.schemaVersion].TableName: 1, }, counts, "after first two writes") // write again @@ -97,8 +122,8 @@ func TestOperationNamesStorageWrite(t *testing.T) { expCounts := counts if writeCacheTTL == 0 { // without write cache, the second write must succeed - expCounts["attempts|table=operation_names"]++ - expCounts["inserts|table=operation_names"]++ + expCounts["attempts|table="+schemas[test.schemaVersion].TableName]++ + expCounts["inserts|table="+schemas[test.schemaVersion].TableName]++ } assert.Equal(t, expCounts, counts2) }) @@ -118,28 +143,37 @@ func TestOperationNamesStorageGetServices(t *testing.T) { return true }) matchEverything := mock.MatchedBy(func(v []interface{}) bool { return true }) - for _, expErr := range []error{nil, scanError} { - withOperationNamesStorage(writeCacheTTL, func(s *operationNameStorageTest) { - iter := &mocks.Iterator{} - iter.On("Scan", matchOnce).Return(true) - iter.On("Scan", matchEverything).Return(false) // false to stop the loop - iter.On("Close").Return(expErr) + for _, test := range []test{ + {0, 0, nil}, + {0, 0, scanError}, + {0, 1, nil}, + {0, 1, scanError}, + } { + t.Run(fmt.Sprintf("test %#v", test), func(t *testing.T) { + withOperationNamesStorage(writeCacheTTL, test.schemaVersion, func(s *operationNameStorageTest) { + iter := &mocks.Iterator{} + iter.On("Scan", matchOnce).Return(true) + iter.On("Scan", matchEverything).Return(false) // false to stop the loop + iter.On("Close").Return(test.expErr) - query := &mocks.Query{} - query.On("Iter").Return(iter) - - s.session.On("Query", mock.AnythingOfType("string"), []interface{}{"service-a"}).Return(query) - - services, err := s.storage.GetOperations("service-a") - if expErr == nil { - assert.NoError(t, err) - // expect empty string because mock iter.Scan(&placeholder) does not write to `placeholder` - assert.Equal(t, []string{""}, services) - } else { - assert.EqualError(t, err, "Error reading operation_names from storage: "+expErr.Error()) - } + query := &mocks.Query{} + query.On("Iter").Return(iter) + + s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) + services, err := s.storage.GetOperations("service-a") + if test.expErr == nil { + assert.NoError(t, err) + if test.schemaVersion == 0 { + // expect one empty operation result because mock iter.Scan(&placeholder) does not write to `placeholder` + assert.Equal(t, []string{""}, services) + } else { + assert.Equal(t, []string{}, services) + } + } else { + assert.EqualError(t, err, fmt.Sprintf("Error reading %s from storage: %s", schemas[test.schemaVersion].TableName, test.expErr.Error())) + } + }) }) - } } diff --git a/plugin/storage/cassandra/spanstore/reader_test.go b/plugin/storage/cassandra/spanstore/reader_test.go index 5f60eeff361c..0ebce86ccc28 100644 --- a/plugin/storage/cassandra/spanstore/reader_test.go +++ b/plugin/storage/cassandra/spanstore/reader_test.go @@ -45,6 +45,9 @@ type spanReaderTest struct { func withSpanReader(fn func(r *spanReaderTest)) { session := &mocks.Session{} + query := &mocks.Query{} + session.On("Query", TableQueryStmt, mock.Anything).Return(query) + query.On("Exec").Return(nil) logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) r := &spanReaderTest{ diff --git a/plugin/storage/cassandra/spanstore/writer_test.go b/plugin/storage/cassandra/spanstore/writer_test.go index e911d117bae1..5ac377543747 100644 --- a/plugin/storage/cassandra/spanstore/writer_test.go +++ b/plugin/storage/cassandra/spanstore/writer_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/uber/jaeger-lib/metrics/metricstest" "go.uber.org/zap" @@ -42,6 +43,9 @@ type spanWriterTest struct { func withSpanWriter(writeCacheTTL time.Duration, fn func(w *spanWriterTest), options ...Option, ) { session := &mocks.Session{} + query := &mocks.Query{} + session.On("Query", TableQueryStmt, mock.Anything).Return(query) + query.On("Exec").Return(nil) logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) w := &spanWriterTest{