Skip to content

Commit

Permalink
gRPC Storage Service (grafana#2220)
Browse files Browse the repository at this point in the history
* gRPC Storage Service

This PR contains gRPC service client i.e
1. Index Client
2. Storage Client
3. Table Client

Signed-off-by: vineeth <vineethpothulapati@outlook.com>

* Fixed the spaces in imports.

Signed-off-by: Vineeth <vineethpothulapati@outlook.com>

* Made changes around review comments

Signed-off-by: Vineeth Pothulapati <vineethpothulapati@outlook.com>

* Address the rpc message change from TableDesc to CreateTableRequest & added the logic to terminate connection on Stop() invocation.

Signed-off-by: Vineeth Pothulapati <vineethpothulapati@outlook.com>

* Added the chnages in CHANGELOG.md & add information around rpc calls in grpc.proto

Signed-off-by: Vineeth Pothulapati <vineethpothulapati@outlook.com>

Co-authored-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
VineethReddy02 and pracucci authored May 29, 2020
1 parent 9767b49 commit d0b12c1
Show file tree
Hide file tree
Showing 9 changed files with 7,359 additions and 2 deletions.
6,481 changes: 6,481 additions & 0 deletions grpc/grpc.pb.go

Large diffs are not rendered by default.

142 changes: 142 additions & 0 deletions grpc/grpc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
syntax = "proto3";

package grpc;

import "google/protobuf/empty.proto";

service grpc_store {
/// index-client

/// WriteIndex writes batch of indexes to the index tables.
rpc WriteIndex(WriteIndexRequest) returns (google.protobuf.Empty);
/// QueryIndex reads the indexes required for given query & sends back the batch of rows
/// in rpc streams
rpc QueryIndex(QueryIndexRequest) returns (stream QueryIndexResponse);
/// DeleteIndex deletes the batch of index entries from the index tables
rpc DeleteIndex(DeleteIndexRequest) returns (google.protobuf.Empty);

/// storage-client

/// PutChunks saves the batch of chunks into the chunk tables.
rpc PutChunks(PutChunksRequest) returns (google.protobuf.Empty);
/// GetChunks requests for batch of chunks and the batch of chunks are sent back in rpc streams
/// batching needs to be performed at server level as per requirement instead of sending single chunk per stream.
/// In GetChunks rpc request send buf as nil
rpc GetChunks(GetChunksRequest) returns (stream GetChunksResponse);
/// DeleteChunks deletes the chunks based on chunkID.
rpc DeleteChunks(ChunkID) returns (google.protobuf.Empty);

/// table-client

/// Lists all the tables that exists in the database.
rpc ListTables(google.protobuf.Empty) returns (ListTablesResponse);
/// Creates a table with provided name & attributes.
rpc CreateTable(CreateTableRequest) returns (google.protobuf.Empty);
// Deletes a table using table name provided.
rpc DeleteTable(DeleteTableRequest) returns (google.protobuf.Empty);
// Describes a table information for the provided table.
rpc DescribeTable(DescribeTableRequest) returns (DescribeTableResponse);
// Update a table with newly provided table information.
rpc UpdateTable(UpdateTableRequest) returns (google.protobuf.Empty);
}

message PutChunksRequest {
repeated Chunk chunks = 1;
}

message GetChunksRequest {
repeated Chunk chunks = 1;
}

message GetChunksResponse {
repeated Chunk chunks = 1;
}

message Chunk {
bytes encoded = 1;
string key = 2;
string tableName = 3;
}

message ChunkID {
string chunkID = 1;
}

message DeleteTableRequest {
string tableName = 1;
}

message DescribeTableRequest {
string tableName = 1;
}

message WriteBatch {
repeated IndexEntry writes = 1;
repeated IndexEntry deletes = 2;
}

message WriteIndexRequest {
repeated IndexEntry writes = 1;
}

message DeleteIndexRequest {
repeated IndexEntry deletes = 1;
}

message QueryIndexResponse {
repeated Row rows = 1;
}

message Row {
bytes rangeValue = 1;
bytes value = 2;
}

message IndexEntry {
string tableName = 1;
string hashValue = 2;
bytes rangeValue = 3;
bytes value = 4;
}

message QueryIndexRequest {
string tableName = 1;
string hashValue = 2;
bytes rangeValuePrefix = 3;
bytes rangeValueStart = 4;
bytes valueEqual = 5;
bool immutable = 6;
}

message UpdateTableRequest {
TableDesc current = 1;
TableDesc expected = 2;
}

message DescribeTableResponse {
TableDesc desc = 1;
bool isActive = 2;
}

message CreateTableRequest {
TableDesc desc = 1;
}

message TableDesc {
string name = 1;
bool useOnDemandIOMode = 2;
int64 provisionedRead = 3;
int64 provisionedWrite = 4;
map<string, string> tags = 5;
}

message ListTablesResponse {
repeated string tableNames = 1;
}

message Labels {
string name = 1;
string value = 2;
}


34 changes: 34 additions & 0 deletions grpc/grpc_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package grpc

import (
"flag"
"time"

"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)

// Config for a StorageClient
type Config struct {
Address string `yaml:"server_address,omitempty"`
}

// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Address, "grpc-store.server-address", "", "Hostname or IP of the gRPC store instance.")
}

func connectToGrpcServer(serverAddress string) (GrpcStoreClient, *grpc.ClientConn, error) {
params := keepalive.ClientParameters{
Time: time.Second * 20,
Timeout: time.Minute * 10,
PermitWithoutStream: true,
}
param := grpc.WithKeepaliveParams(params)
cc, err := grpc.Dial(serverAddress, param, grpc.WithInsecure())
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to dial grpc-store %s", serverAddress)
}
return NewGrpcStoreClient(cc), cc, nil
}
180 changes: 180 additions & 0 deletions grpc/grpc_client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package grpc

import (
"context"
"testing"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/require"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/encoding"
prom_chunk "github.com/cortexproject/cortex/pkg/chunk/encoding"
)

// This includes test for all RPCs in
// tableClient, indexClient, storageClient
func TestGrpcStore(t *testing.T) {
var err error
cleanup, storeAddress := createTestGrpcServer(t)
defer cleanup()
cfg := Config{Address: storeAddress}
schemaCfg := chunk.SchemaConfig{Configs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: 1564358400000},
IndexType: "grpc-store",
ObjectType: "grpc-store",
Schema: "v10",
IndexTables: chunk.PeriodicTableConfig{
Prefix: "index_",
Period: 604800000000000,
Tags: nil,
},
RowShards: 16,
},
}}

// rpc calls specific to tableClient
tableClient, _ := NewTestTableClient(cfg)
tableDesc := chunk.TableDesc{
Name: "chunk_2607",
UseOnDemandIOMode: false,
ProvisionedRead: 300,
ProvisionedWrite: 1,
Tags: nil,
}
err = tableClient.CreateTable(context.Background(), tableDesc)
require.NoError(t, err)

_, err = tableClient.ListTables(context.Background())
require.NoError(t, err)

_, _, err = tableClient.DescribeTable(context.Background(), "chunk_2591")
require.NoError(t, err)

currentTable := chunk.TableDesc{
Name: "chunk_2591",
UseOnDemandIOMode: false,
ProvisionedRead: 0,
ProvisionedWrite: 0,
Tags: nil,
}
expectedTable := chunk.TableDesc{
Name: "chunk_2591",
UseOnDemandIOMode: false,
ProvisionedRead: 300,
ProvisionedWrite: 1,
Tags: nil,
}

err = tableClient.UpdateTable(context.Background(), currentTable, expectedTable)
require.NoError(t, err)

err = tableClient.DeleteTable(context.Background(), "chunk_2591")
require.NoError(t, err)

// rpc calls for storageClient
storageClient, _ := NewTestStorageClient(cfg, schemaCfg)

putChunksTestData := []chunk.Chunk{
{
Fingerprint: 15993187966453505842,
UserID: "fake",
From: 1587997054298,
Through: 1587997054298,
Metric: labels.Labels{
{
Name: "_name_",
Value: "prometheus_sd_file_scan_duration_seconds_sum",
},
{
Name: "instance",
Value: "localhost:9090",
},
{
Name: "job",
Value: "prometheus",
},
},
ChecksumSet: true,
Checksum: 3651208117,
Encoding: encoding.Bigchunk,
Data: prom_chunk.New(),
},
}
err = storageClient.PutChunks(context.Background(), putChunksTestData)
require.NoError(t, err)

getChunksTestData := []chunk.Chunk{
{
Fingerprint: 15993187966453505842,
UserID: "fake",
From: 1587997054298,
Through: 1587997054298,
Metric: labels.Labels{
{
Name: "_name_",
Value: "prometheus_sd_file_scan_duration_seconds_sum",
},
{
Name: "instance",
Value: "localhost:9090",
},
{
Name: "job",
Value: "prometheus",
},
},
ChecksumSet: true,
Checksum: 3651208117,
Encoding: encoding.Bigchunk,
Data: prom_chunk.New(),
},
}
_, err = storageClient.GetChunks(context.Background(), getChunksTestData)
require.NoError(t, err)

err = storageClient.DeleteChunk(context.Background(), "")
require.NoError(t, err)

//rpc calls specific to indexClient
writeBatchTestData := writeBatchTestData()
err = storageClient.BatchWrite(context.Background(), writeBatchTestData)
require.NoError(t, err)

queries := []chunk.IndexQuery{
{TableName: "table", HashValue: "foo"},
}
results := 0
err = storageClient.QueryPages(context.Background(), queries, func(query chunk.IndexQuery, batch chunk.ReadBatch) bool {
iter := batch.Iterator()
for iter.Next() {
results++
}
return true
})
require.NoError(t, err)

}

func writeBatchTestData() chunk.WriteBatch {
t := &WriteBatch{
Writes: []*IndexEntry{
{
TableName: "index_2625",
HashValue: "fake:d18381:5f3DoSEa2cDzymQ7u8VZ6c/ku1HlYIdMWqdg1QKCYh4",
RangeValue: []byte("JSI0YbyRLVmLKkLBiAKf5ctf8mWtn9U6CXCzuYmWkMk 5f3DoSEa2cDzymQ7u8VZ6c/ku1HlYIdMWqdg1QKCYh4 8"),
Value: []byte("localhost:9090"),
},
},
Deletes: []*IndexEntry{
{
TableName: "index_2625",
HashValue: "fake:d18381:5f3DoSEa2cDzymQ7u8VZ6c/ku1HlYIdMWqdg1QKCYh4",
RangeValue: []byte("JSI0YbyRLVmLKkLBiAKf5ctf8mWtn9U6CXCzuYmWkMk 5f3DoSEa2cDzymQ7u8VZ6c/ku1HlYIdMWqdg1QKCYh4 8"),
Value: nil,
},
},
}
return t
}
Loading

0 comments on commit d0b12c1

Please sign in to comment.