diff --git a/Makefile b/Makefile index 351af2658..b4d81ae5b 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,9 @@ deps: ####################### BUILD ####################### +build_balancer: + go build -pgo=auto -o spqr-balancer ./cmd/balancer + build_coorctl: go build -pgo=auto -o coorctl ./cmd/coordctl @@ -42,7 +45,7 @@ build_workloadreplay: build_spqrdump: go build -pgo=auto -o spqrdump ./cmd/spqrdump -build: build_coordinator build_coorctl build_router build_mover build_worldmock build_workloadreplay build_spqrdump +build: build_balancer build_coordinator build_coorctl build_router build_mover build_worldmock build_workloadreplay build_spqrdump build_images: docker compose build spqr-base-image @@ -59,7 +62,7 @@ save_shard_image: docker save ${IMAGE_SHARD} | gzip -c > ${CACHE_FILE_SHARD};\ clean: - rm -f spqr-router spqr-coordinator spqr-mover spqr-worldmock + rm -f spqr-router spqr-coordinator spqr-mover spqr-worldmock spqr-balancer make clean_feature_test ######################## RUN ######################## diff --git a/balancer/app/app.go b/balancer/app/app.go new file mode 100644 index 000000000..a21df1ed7 --- /dev/null +++ b/balancer/app/app.go @@ -0,0 +1,31 @@ +package app + +import ( + "context" + "github.com/pg-sharding/spqr/balancer" + "github.com/pg-sharding/spqr/pkg/config" + "github.com/pg-sharding/spqr/pkg/spqrlog" + "time" +) + +type App struct { + balancer balancer.Balancer +} + +func NewApp(b balancer.Balancer) *App { + return &App{ + balancer: b, + } +} + +func (app *App) Run() error { + if err := spqrlog.UpdateZeroLogLevel(config.BalancerConfig().LogLevel); err != nil { + return err + } + spqrlog.Zero.Info().Msg("running balancer") + + ctx, cancel := context.WithTimeout(context.TODO(), time.Duration(config.BalancerConfig().TimeoutSec)*time.Second) + defer cancel() + app.balancer.RunBalancer(ctx) + return nil +} diff --git a/balancer/balancer.go b/balancer/balancer.go new file mode 100644 index 000000000..9e00f0a36 --- /dev/null +++ b/balancer/balancer.go @@ -0,0 +1,7 @@ +package balancer + +import "context" + +type Balancer interface { + RunBalancer(ctx context.Context) +} diff --git a/balancer/provider/balancer.go b/balancer/provider/balancer.go new file mode 100644 index 000000000..a297816e8 --- /dev/null +++ b/balancer/provider/balancer.go @@ -0,0 +1,686 @@ +package provider + +import ( + "context" + "fmt" + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/pg-sharding/spqr/balancer" + "github.com/pg-sharding/spqr/pkg/config" + "github.com/pg-sharding/spqr/pkg/models/distributions" + "github.com/pg-sharding/spqr/pkg/models/kr" + "github.com/pg-sharding/spqr/pkg/models/tasks" + protos "github.com/pg-sharding/spqr/pkg/protos" + "github.com/pg-sharding/spqr/pkg/spqrlog" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "sort" + "strings" +) + +type BalancerImpl struct { + coordinatorConn *grpc.ClientConn + threshold []float64 + + keyRanges []*kr.KeyRange + krIdx map[string]int + shardKr map[string][]int +} + +func NewBalancer() (*BalancerImpl, error) { + threshold := make([]float64, 2*metricsCount) + configThresholds := []float64{config.BalancerConfig().CpuThreshold, config.BalancerConfig().SpaceThreshold} + for i := 0; i < metricsCount; i++ { + threshold[i] = configThresholds[i] + threshold[metricsCount+i] = configThresholds[i] + } + conn, err := grpc.Dial(config.BalancerConfig().CoordinatorAddress, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, err + } + return &BalancerImpl{ + coordinatorConn: conn, + threshold: threshold, + keyRanges: []*kr.KeyRange{}, + krIdx: map[string]int{}, + shardKr: map[string][]int{}, + }, nil +} + +var _ balancer.Balancer = &BalancerImpl{} + +func (b *BalancerImpl) RunBalancer(ctx context.Context) { + // TODO: add command to drop task group to coordinator + taskGroup, err := b.getCurrentTaskGroupFromQDB(ctx) + if err != nil { + spqrlog.Zero.Error().Err(err).Msg("error getting current tasks") + return + } + if taskGroup == nil || len(taskGroup.Tasks) == 0 { + taskGroup, err = b.generateTasks(ctx) + if err != nil { + spqrlog.Zero.Error().Err(err).Msg("error planning tasks") + return + } + if len(taskGroup.Tasks) == 0 { + spqrlog.Zero.Debug().Msg("Nothing to execute") + return + } + if err := b.syncTaskGroupWithQDB(ctx, taskGroup); err != nil { + spqrlog.Zero.Error().Err(err).Msg("error inserting tasks") + return + } + } + if err := b.executeTasks(ctx, taskGroup); err != nil { + spqrlog.Zero.Error().Err(err).Msg("error executing tasks") + } +} + +func (b *BalancerImpl) generateTasks(ctx context.Context) (*tasks.TaskGroup, error) { + shardsServiceClient := protos.NewShardServiceClient(b.coordinatorConn) + r, err := shardsServiceClient.ListShards(ctx, &protos.ListShardsRequest{}) + if err != nil { + return nil, err + } + shardToState := make(map[string]*ShardMetrics) + shardStates := make([]*ShardMetrics, 0) + spqrlog.Zero.Debug().Int("shards count", len(r.Shards)).Msg("got shards from coordinator") + for _, shard := range r.Shards { + state, err := b.getShardCurrentState(ctx, shard) + if err != nil { + return nil, err + } + shardToState[shard.Id] = state + shardStates = append(shardStates, state) + } + + maxMetric, criterion := b.getCriterion(shardStates) + sort.Slice(shardStates, func(i, j int) bool { + return shardStates[i].MetricsTotal[criterion] > shardStates[j].MetricsTotal[criterion] + }) + + spqrlog.Zero.Debug().Float64("metric", maxMetric).Int("criterion", criterion).Msg("Max metric") + + if maxMetric <= 1 { + spqrlog.Zero.Debug().Msg("Metrics below the threshold, exiting") + return &tasks.TaskGroup{}, nil + } + + if err = b.updateKeyRanges(ctx); err != nil { + return nil, fmt.Errorf("error updating key range info: %s", err) + } + + if err = b.getStatsByKeyRange(ctx, shardStates[0]); err != nil { + return nil, fmt.Errorf("error getting detailed stats: %s", err) + } + + // determine most loaded key range + shardFrom := shardStates[0] + + kRLoad, krId := b.getMostLoadedKR(shardFrom, criterion) + + meanKeyLoad := kRLoad / float64(shardFrom.KeyCountKR[krId]) + keyCount := int((shardFrom.MetricsTotal[criterion] - b.threshold[criterion]) / meanKeyLoad) + // do not move more keys than there are in the key range + keyCount = min(keyCount, int(shardFrom.KeyCountKR[krId])) + + // determine where to move keys to + shId, ok := b.getShardToMoveTo(shardStates, shardToState, krId, shardFrom.ShardId, keyCount) + + if !ok { + shId, keyCount = b.moveMaxPossible(shardStates, shardToState, krId, shardFrom.ShardId) + if keyCount < 0 { + return nil, fmt.Errorf("could not find shard to move keys to") + } + } + + if keyCount == 0 { + return &tasks.TaskGroup{Tasks: []*tasks.Task{}}, nil + } + return b.getTasks(ctx, shardFrom, krId, shId, keyCount) +} + +func (b *BalancerImpl) getShardCurrentState(ctx context.Context, shard *protos.Shard) (*ShardMetrics, error) { + spqrlog.Zero.Debug().Str("shard id", shard.Id).Msg("getting shard state") + hosts := shard.Hosts + res := NewShardMetrics() + res.ShardId = shard.Id + replicaMetrics := NewHostMetrics() + for _, host := range hosts { + hostsMetrics, isMaster, err := b.getHostStatus(ctx, host) + if err != nil { + return nil, err + } + if hostsMetrics == nil { + continue + } + if isMaster { + res.SetMasterMetrics(hostsMetrics) + res.Master = host + continue + } + replicaThreshold := b.threshold[metricsCount:] + if replicaMetrics.MaxRelative(replicaThreshold) < hostsMetrics.MaxRelative(replicaThreshold) { + replicaMetrics = hostsMetrics + res.TargetReplica = host + } + } + res.SetReplicaMetrics(replicaMetrics) + return res, nil +} + +func (b *BalancerImpl) getHostStatus(ctx context.Context, dsn string) (metrics HostMetrics, isMaster bool, err error) { + spqrlog.Zero.Debug().Str("host", dsn).Msg("getting host state") + conn, err := pgx.Connect(ctx, dsn) + if err != nil { + return nil, false, nil + } + metrics = NewHostMetrics() + + row := conn.QueryRow(ctx, "SELECT NOT pg_is_in_recovery() as is_master;") + if err = row.Scan(&isMaster); err != nil { + return nil, false, err + } + + query := fmt.Sprintf(` + SELECT coalesce(SUM((user_time + system_time)), 0) AS cpu_total + FROM pgcs_get_stats_time_interval(now() - interval '%ds', now()) +`, config.BalancerConfig().StatIntervalSec) + spqrlog.Zero.Debug().Str("query", query).Msg("Getting cpu stats") + row = conn.QueryRow(ctx, query) + if err = row.Scan(&metrics[cpuMetric]); err != nil { + return nil, isMaster, err + } + + query = `SELECT SUM(pg_database_size(datname)) as total_size + FROM pg_database + WHERE datname != 'template0' + AND datname != 'template1' + AND datname != 'postgres';` + spqrlog.Zero.Debug().Str("query", query).Msg("Getting space stats") + row = conn.QueryRow(ctx, query) + if err = row.Scan(&metrics[spaceMetric]); err != nil { + return nil, isMaster, err + } + + spqrlog.Zero.Debug(). + Float64("cpu-metric", metrics[cpuMetric]). + Float64("space-metric", metrics[spaceMetric]). + Bool("is master", isMaster). + Msg("got host state") + return +} + +// getStatsByKeyRange gets statistics by key range & updates ShardMetrics +func (b *BalancerImpl) getStatsByKeyRange(ctx context.Context, shard *ShardMetrics) error { + spqrlog.Zero.Debug().Str("shard", shard.ShardId).Msg("getting shard detailed state") + + type paramsStruct struct { + Host string + MetricsStartInd int + } + paramsList := []paramsStruct{ + {Host: shard.Master, MetricsStartInd: 0}, + } + if shard.TargetReplica != "" { + paramsList = append(paramsList, paramsStruct{Host: shard.TargetReplica, MetricsStartInd: metricsCount}) + } + for _, params := range paramsList { + spqrlog.Zero.Debug().Str("host", params.Host).Msg("getting host detailed state") + conn, err := pgx.Connect(ctx, params.Host) + if err != nil { + return err + } + query := fmt.Sprintf(` + SELECT + comment_keys->>'key_range_id' AS key_range_id, + SUM(user_time + system_time) AS cpu + FROM ( + SELECT * + FROM pgcs_get_stats_time_interval(now() - interval '%ds', now()) + WHERE comment_keys->>'key_range_id' IS NOT NULL + ) as pg_comment_stats + GROUP BY key_range_id; +`, config.BalancerConfig().StatIntervalSec) + rows, err := conn.Query(ctx, query) + if err != nil { + return err + } + for rows.Next() { + krId := "" + cpu := 0.0 + if err = rows.Scan(&krId, &cpu); err != nil { + return err + } + if _, ok := b.krIdx[krId]; !ok { + continue + } + if _, ok := shard.MetricsKR[krId]; !ok { + shard.MetricsKR[krId] = make([]float64, 2*metricsCount) + } + shard.MetricsKR[krId][params.MetricsStartInd+cpuMetric] = cpu + } + } + + conn, err := pgx.Connect(ctx, shard.Master) + if err != nil { + return err + } + + for _, i := range b.shardKr[shard.ShardId] { + krg := b.keyRanges[i] + if krg.ShardID != shard.ShardId { + continue + } + rels, err := b.getKRRelations(ctx, krg) + if err != nil { + return err + } + + for _, rel := range rels { + queryRaw := ` + SELECT sum(pg_column_size(t.*)) as filesize, count(*) as filerow + FROM %s as t + WHERE %s; +` + var nextKR *kr.KeyRange + if i < len(b.keyRanges)-1 { + nextKR = b.keyRanges[i+1] + } + condition, err := b.getKRCondition(rel, krg, nextKR, "t") + if err != nil { + return err + } + query := fmt.Sprintf(queryRaw, rel.Name, condition) + spqrlog.Zero.Debug().Str("query", query).Msg("getting space usage & key count") + + row := conn.QueryRow(ctx, query) + var size, count int64 + if err := row.Scan(&size, &count); err != nil { + return err + } + if _, ok := shard.MetricsKR[krg.ID]; !ok { + shard.MetricsKR[krg.ID] = make([]float64, 2*metricsCount) + } + shard.MetricsKR[krg.ID][spaceMetric] += float64(size) + shard.KeyCountKR[krg.ID] += count + if _, ok := shard.KeyCountRelKR[krg.ID]; !ok { + shard.KeyCountRelKR[krg.ID] = make(map[string]int64) + } + shard.KeyCountRelKR[krg.ID][rel.Name] = count + } + } + return nil +} + +func (b *BalancerImpl) getKRRelations(ctx context.Context, kRange *kr.KeyRange) ([]*distributions.DistributedRelation, error) { + distributionService := protos.NewDistributionServiceClient(b.coordinatorConn) + res, err := distributionService.GetDistribution(ctx, &protos.GetDistributionRequest{Id: kRange.Distribution}) + if err != nil { + return nil, err + } + rels := make([]*distributions.DistributedRelation, len(res.Distribution.Relations)) + for i, relProto := range res.Distribution.Relations { + rels[i] = distributions.DistributedRelationFromProto(relProto) + } + return rels, nil +} + +// getKRCondition returns SQL condition for elements of distributed relation between two key ranges +// TODO support multidimensional key ranges +func (b *BalancerImpl) getKRCondition(rel *distributions.DistributedRelation, kRange *kr.KeyRange, nextKR *kr.KeyRange, prefix string) (string, error) { + buf := make([]string, len(rel.DistributionKey)) + for i, entry := range rel.DistributionKey { + // TODO remove after multidimensional key range support + if i > 0 { + break + } + // TODO add hash (depends on col type) + hashedCol := "" + if prefix != "" { + hashedCol = fmt.Sprintf("%s.%s", prefix, entry.Column) + } else { + hashedCol = entry.Column + } + if nextKR != nil { + buf[i] = fmt.Sprintf("%s >= %s AND %s < %s", hashedCol, string(kRange.LowerBound), hashedCol, string(nextKR.LowerBound)) + } else { + buf[i] = fmt.Sprintf("%s >= %s", hashedCol, string(kRange.LowerBound)) + } + } + return strings.Join(buf, " AND "), nil +} + +// getShardToMoveTo determines where to send keys from specified key range +// TODO unit tests +func (b *BalancerImpl) getShardToMoveTo(shardMetrics []*ShardMetrics, shardIdToMetrics map[string]*ShardMetrics, krId string, krShardId string, keyCountToMove int) (string, bool) { + krKeyCount := int(shardIdToMetrics[krShardId].KeyCountKR[krId]) + shardToMetrics := shardIdToMetrics[krShardId].MetricsKR[krId] + + // try fitting on shards with adjacent key ranges + adjShards := b.getAdjacentShards(krId) + for adjShard := range adjShards { + if b.fitsOnShard(shardToMetrics, keyCountToMove, krKeyCount, shardIdToMetrics[adjShard]) { + return adjShard, true + } + } + // try fitting on other shards ordered by criterion load ascending + for i := len(shardMetrics) - 1; i >= 0; i-- { + if b.fitsOnShard(shardToMetrics, keyCountToMove, krKeyCount, shardMetrics[i]) { + return shardMetrics[i].ShardId, true + } + } + return "", false +} + +// moveMaxPossible determines where most keys can be sent +// TODO unit tests +func (b *BalancerImpl) moveMaxPossible(shardMetrics []*ShardMetrics, shardIdToMetrics map[string]*ShardMetrics, krId string, krShardId string) (shardId string, maxKeyCount int) { + maxKeyCount = -1 + for i := len(shardMetrics) - 1; i >= 0; i-- { + keyCount := b.maxFitOnShard(shardIdToMetrics[krShardId].MetricsKR[krId], shardIdToMetrics[krShardId].KeyCountKR[krId], shardMetrics[i]) + if keyCount > maxKeyCount { + maxKeyCount = keyCount + shardId = shardMetrics[i].ShardId + } + } + return +} + +// fitsOnShard +// TODO unit tests +func (b *BalancerImpl) fitsOnShard(krMetrics []float64, keyCountToMove int, krKeyCount int, shard *ShardMetrics) bool { + for kind, metric := range shard.MetricsTotal { + meanKeyMetric := krMetrics[kind] / float64(krKeyCount) + loadExpectation := meanKeyMetric*float64(keyCountToMove) + metric + if b.threshold[kind] < loadExpectation { + return false + } + } + return true +} + +// maxFitOnShard determines how many keys we can fit on shard +// TODO unit tests +func (b *BalancerImpl) maxFitOnShard(krMetrics []float64, krKeyCount int64, shard *ShardMetrics) (maxCount int) { + maxCount = -1 + for kind, metric := range shard.MetricsTotal { + // TODO move const to config + krMeanMetricKey := krMetrics[kind] / float64(krKeyCount) + count := int(0.8 * ((b.threshold[kind] - metric) / krMeanMetricKey)) + if count > maxCount { + maxCount = count + } + } + return +} + +func (b *BalancerImpl) getAdjacentShards(krId string) map[string]struct{} { + res := make(map[string]struct{}, 0) + krIdx := b.krIdx[krId] + if krIdx != 0 { + res[b.keyRanges[krIdx-1].ShardID] = struct{}{} + } + if krIdx < len(b.keyRanges)-1 { + res[b.keyRanges[krIdx+1].ShardID] = struct{}{} + } + // do not include current shard + delete(res, b.keyRanges[krIdx].ShardID) + return res +} + +func (b *BalancerImpl) getCriterion(shards []*ShardMetrics) (value float64, kind int) { + value = -1 + kind = -1 + for _, state := range shards { + v, k := MaxRelative(state.MetricsTotal, b.threshold) + if v > value { + value = v + kind = k + } + } + return +} + +func (b *BalancerImpl) getMostLoadedKR(shard *ShardMetrics, kind int) (value float64, krId string) { + value = -1 + for krg := range shard.MetricsKR { + metric := shard.MetricsKR[krg][kind] + totalKRMetric := metric + if totalKRMetric > value { + value = totalKRMetric + krId = krg + } + } + return +} + +func (b *BalancerImpl) getTasks(ctx context.Context, shardFrom *ShardMetrics, krId string, shardToId string, keyCount int) (*tasks.TaskGroup, error) { + spqrlog.Zero.Debug(). + Str("shard_from", shardFrom.ShardId). + Str("shard_to", shardToId). + Str("key_range", krId). + Int("key_count", keyCount). + Msg("generating move tasks") + // Move from beginning or the end of key range + krInd := b.krIdx[krId] + krIdTo := "" + var join tasks.JoinType = tasks.JoinNone + if krInd < len(b.keyRanges)-1 && b.keyRanges[krInd+1].ShardID == shardToId { + krIdTo = b.keyRanges[krInd+1].ID + join = tasks.JoinRight + } else if krInd > 0 && b.keyRanges[krInd-1].ShardID == shardToId { + krIdTo = b.keyRanges[krInd-1].ID + join = tasks.JoinLeft + } + + host := shardFrom.TargetReplica + if host == "" { + host = shardFrom.Master + } + conn, err := pgx.Connect(ctx, host) + if err != nil { + return nil, err + } + + var maxCount int64 = -1 + relName := "" + for r, count := range shardFrom.KeyCountRelKR[krId] { + if count > maxCount { + relName = r + maxCount = count + } + } + if _, ok := b.krIdx[krId]; !ok { + return nil, fmt.Errorf("unknown key range id \"%s\"", krId) + } + var rel *distributions.DistributedRelation = nil + allRels, err := b.getKRRelations(ctx, b.keyRanges[b.krIdx[krId]]) + if err != nil { + return nil, err + } + for _, r := range allRels { + if r.Name == relName { + rel = r + break + } + } + if rel == nil { + return nil, fmt.Errorf("relation \"%s\" not found", relName) + } + + moveCount := min((keyCount+config.BalancerConfig().KeysPerMove-1)/config.BalancerConfig().KeysPerMove, config.BalancerConfig().MaxMoveCount) + + counts := make([]int, moveCount) + for i := 0; i < len(counts)-1; i++ { + counts[i] = config.BalancerConfig().KeysPerMove + } + counts[len(counts)-1] = min(keyCount-(moveCount-1)*config.BalancerConfig().KeysPerMove, config.BalancerConfig().KeysPerMove) + groupTasks := make([]*tasks.Task, moveCount) + totalCount := 0 + // TODO multidimensional key ranges + for i, count := range counts { + offset := totalCount + count + if join != tasks.JoinLeft { + offset-- + } + query := fmt.Sprintf(` + SELECT %s as idx + FROM %s + ORDER BY idx %s + LIMIT 1 + OFFSET %d + `, rel.DistributionKey[0].Column, rel.Name, func() string { + if join != tasks.JoinLeft { + return "DESC" + } + return "" + }(), offset) + spqrlog.Zero.Debug(). + Str("query", query). + Msg("getting split bound") + row := conn.QueryRow(ctx, query) + // TODO typed key ranges + var idx string + if err := row.Scan(&idx); err != nil { + return nil, err + } + groupTasks[len(groupTasks)-1-i] = &tasks.Task{ + ShardFromId: shardFrom.ShardId, + ShardToId: shardToId, + KrIdFrom: krId, + KrIdTo: krIdTo, + Bound: []byte(idx), + } + totalCount += count + } + + return &tasks.TaskGroup{Tasks: groupTasks, JoinType: join}, nil +} + +func (b *BalancerImpl) getCurrentTaskGroupFromQDB(ctx context.Context) (group *tasks.TaskGroup, err error) { + tasksService := protos.NewTasksServiceClient(b.coordinatorConn) + resp, err := tasksService.GetTaskGroup(ctx, &protos.GetTaskGroupRequest{}) + if err != nil { + return nil, err + } + return tasks.TaskGroupFromProto(resp.TaskGroup), nil +} + +func (b *BalancerImpl) syncTaskGroupWithQDB(ctx context.Context, group *tasks.TaskGroup) error { + tasksService := protos.NewTasksServiceClient(b.coordinatorConn) + _, err := tasksService.WriteTaskGroup(ctx, &protos.WriteTaskGroupRequest{TaskGroup: tasks.TaskGroupToProto(group)}) + return err +} + +func (b *BalancerImpl) removeTaskGroupFromQDB(ctx context.Context) error { + tasksService := protos.NewTasksServiceClient(b.coordinatorConn) + _, err := tasksService.RemoveTaskGroup(ctx, &protos.RemoveTaskGroupRequest{}) + return err +} + +func (b *BalancerImpl) executeTasks(ctx context.Context, group *tasks.TaskGroup) error { + + keyRangeService := protos.NewKeyRangeServiceClient(b.coordinatorConn) + + id := uuid.New() + + for len(group.Tasks) > 0 { + task := group.Tasks[len(group.Tasks)-1] + spqrlog.Zero.Debug(). + Str("key_range_from", task.KrIdFrom). + Str("key_range_to", task.KrIdTo). + Str("bound", string(task.Bound)). + Int("state", int(task.State)). + Msg("processing task") + switch task.State { + case tasks.TaskPlanned: + // TODO check for duplicate key range id + newKeyRange := fmt.Sprintf("kr_%s", id.String()) + + if _, err := keyRangeService.SplitKeyRange(ctx, &protos.SplitKeyRangeRequest{ + NewId: newKeyRange, + SourceId: task.KrIdFrom, + Bound: task.Bound, + SplitLeft: group.JoinType == tasks.JoinLeft, + }); err != nil { + return err + } + + task.KrIdTemp = newKeyRange + task.State = tasks.TaskSplit + if err := b.syncTaskGroupWithQDB(ctx, group); err != nil { + // TODO mb retry? + return err + } + continue + case tasks.TaskSplit: + // TODO account for join type + if _, err := keyRangeService.MoveKeyRange(ctx, &protos.MoveKeyRangeRequest{ + Id: task.KrIdTemp, + ToShardId: task.ShardToId, + }); err != nil { + return err + } + task.State = tasks.TaskMoved + if err := b.syncTaskGroupWithQDB(ctx, group); err != nil { + // TODO mb retry? + return err + } + continue + case tasks.TaskMoved: + if group.JoinType != tasks.JoinNone { + if _, err := keyRangeService.MergeKeyRange(ctx, &protos.MergeKeyRangeRequest{ + BaseId: task.KrIdTo, + AppendageId: task.KrIdTemp, + }); err != nil { + return err + } + } else { + for _, otherTask := range group.Tasks { + otherTask.KrIdTo = task.KrIdTemp + } + group.JoinType = tasks.JoinRight + id = uuid.New() + } + group.Tasks = group.Tasks[:len(group.Tasks)-1] + if err := b.syncTaskGroupWithQDB(ctx, group); err != nil { + // TODO mb retry? + return err + } + continue + default: + return fmt.Errorf("unknown task state %d", task.State) + } + } + + // TODO mb retry? + return b.removeTaskGroupFromQDB(ctx) +} + +func (b *BalancerImpl) updateKeyRanges(ctx context.Context) error { + keyRangeService := protos.NewKeyRangeServiceClient(b.coordinatorConn) + keyRangesProto, err := keyRangeService.ListAllKeyRanges(ctx, &protos.ListAllKeyRangesRequest{}) + if err != nil { + return err + } + keyRanges := make([]*kr.KeyRange, len(keyRangesProto.KeyRangesInfo)) + for i, krProto := range keyRangesProto.KeyRangesInfo { + keyRanges[i] = kr.KeyRangeFromProto(krProto) + } + sort.Slice(keyRanges, func(i, j int) bool { + return kr.CmpRangesLess(keyRanges[i].LowerBound, keyRanges[j].LowerBound) + }) + b.keyRanges = keyRanges + b.krIdx = make(map[string]int) + b.shardKr = make(map[string][]int) + for i, krg := range b.keyRanges { + b.krIdx[krg.ID] = i + if _, ok := b.shardKr[krg.ShardID]; !ok { + b.shardKr[krg.ShardID] = make([]int, 0) + } + b.shardKr[krg.ShardID] = append(b.shardKr[krg.ShardID], i) + } + + return nil +} diff --git a/balancer/provider/statistics.go b/balancer/provider/statistics.go new file mode 100644 index 000000000..6ff60db64 --- /dev/null +++ b/balancer/provider/statistics.go @@ -0,0 +1,65 @@ +package provider + +const ( + cpuMetric = iota + spaceMetric + metricsCount // insert new metric types above +) + +type ShardMetrics struct { + ShardId string + MetricsTotal []float64 + MetricsKR map[string][]float64 // mean value for object by key range + KeyCountKR map[string]int64 + KeyCountRelKR map[string]map[string]int64 + Master string + TargetReplica string +} + +type HostMetrics []float64 + +func NewHostMetrics() HostMetrics { + return make([]float64, metricsCount) +} + +func NewShardMetrics() *ShardMetrics { + return &ShardMetrics{ + MetricsTotal: make([]float64, 2*metricsCount), + MetricsKR: make(map[string][]float64), + KeyCountKR: make(map[string]int64), + KeyCountRelKR: make(map[string]map[string]int64), + } +} + +func (m *ShardMetrics) SetMasterMetrics(metrics HostMetrics) { + for i := 0; i < metricsCount; i++ { + m.MetricsTotal[i] = metrics[i] + } +} + +func (m *ShardMetrics) SetReplicaMetrics(metrics HostMetrics) { + for i := 0; i < metricsCount; i++ { + m.MetricsTotal[i+metricsCount] = metrics[i] + } +} + +func (m HostMetrics) MaxRelative(threshold []float64) (val float64) { + val, _ = MaxRelative(m, threshold) + return val +} + +func MaxRelative(metrics, threshold []float64) (val float64, kind int) { + if len(metrics) != len(threshold) { + panic("incorrect size of threshold") + } + + val = -1 + for k, metric := range metrics { + relative := metric / threshold[k] + if relative > val { + val = relative + kind = k + } + } + return +} diff --git a/cmd/balancer/main.go b/cmd/balancer/main.go new file mode 100644 index 000000000..1433a57e1 --- /dev/null +++ b/cmd/balancer/main.go @@ -0,0 +1,48 @@ +package main + +import ( + "github.com/pg-sharding/spqr/balancer/app" + "github.com/pg-sharding/spqr/balancer/provider" + "github.com/pg-sharding/spqr/pkg" + "github.com/pg-sharding/spqr/pkg/config" + "github.com/pg-sharding/spqr/pkg/spqrlog" + "github.com/spf13/cobra" +) + +var ( + cfgPath string +) + +var rootCmd = &cobra.Command{ + Use: "spqr-balancer run --config `path-to-config`", + Short: "spqr-balancer", + Long: "spqr-balancer", + CompletionOptions: cobra.CompletionOptions{ + DisableDefaultCmd: true, + }, + Version: pkg.SpqrVersionRevision, + SilenceUsage: false, + SilenceErrors: false, + RunE: func(cmd *cobra.Command, args []string) error { + if err := config.LoadBalancerCfg(cfgPath); err != nil { + return err + } + + balancer, err := provider.NewBalancer() + if err != nil { + return err + } + app := app.NewApp(balancer) + return app.Run() + }, +} + +func init() { + rootCmd.PersistentFlags().StringVarP(&cfgPath, "config", "c", "/etc/spqr/balancer.yaml", "path to config file") +} + +func main() { + if err := rootCmd.Execute(); err != nil { + spqrlog.Zero.Error().Err(err).Msg("") + } +} diff --git a/coordinator/app/app.go b/coordinator/app/app.go index a69c37a1d..ba7216a9a 100644 --- a/coordinator/app/app.go +++ b/coordinator/app/app.go @@ -104,12 +104,14 @@ func (app *App) ServeGrpcApi(wg *sync.WaitGroup) error { shardingRulesServ := provider.NewShardingRulesServer(app.coordinator) shardServ := provider.NewShardServer(app.coordinator) dsServ := provider.NewDistributionServer(app.coordinator) + tasksServ := provider.NewTasksServer(app.coordinator) protos.RegisterKeyRangeServiceServer(serv, krServ) protos.RegisterRouterServiceServer(serv, rrServ) protos.RegisterTopologyServiceServer(serv, topServ) protos.RegisterShardingRulesServiceServer(serv, shardingRulesServ) protos.RegisterShardServiceServer(serv, shardServ) protos.RegisterDistributionServiceServer(serv, dsServ) + protos.RegisterTasksServiceServer(serv, tasksServ) address := net.JoinHostPort(config.CoordinatorConfig().Host, config.CoordinatorConfig().GrpcApiPort) listener, err := net.Listen("tcp", address) diff --git a/coordinator/provider/coordinator.go b/coordinator/provider/coordinator.go index 7b98d5228..533b1fef5 100644 --- a/coordinator/provider/coordinator.go +++ b/coordinator/provider/coordinator.go @@ -33,6 +33,7 @@ import ( "github.com/pg-sharding/spqr/pkg/connectiterator" "github.com/pg-sharding/spqr/pkg/models/datashards" "github.com/pg-sharding/spqr/pkg/models/kr" + "github.com/pg-sharding/spqr/pkg/models/tasks" "github.com/pg-sharding/spqr/pkg/pool" routerproto "github.com/pg-sharding/spqr/pkg/protos" "github.com/pg-sharding/spqr/qdb" @@ -987,6 +988,22 @@ func (qc *qdbCoordinator) UnregisterRouter(ctx context.Context, rID string) erro return qc.db.DeleteRouter(ctx, rID) } +func (qc *qdbCoordinator) GetTaskGroup(ctx context.Context) (*tasks.TaskGroup, error) { + group, err := qc.db.GetTaskGroup(ctx) + if err != nil { + return nil, err + } + return tasks.TaskGroupFromDb(group), nil +} + +func (qc *qdbCoordinator) WriteTaskGroup(ctx context.Context, taskGroup *tasks.TaskGroup) error { + return qc.db.WriteTaskGroup(ctx, tasks.TaskGroupToDb(taskGroup)) +} + +func (qc *qdbCoordinator) RemoveTaskGroup(ctx context.Context) error { + return qc.db.RemoveTaskGroup(ctx) +} + // TODO : unit tests func (qc *qdbCoordinator) PrepareClient(nconn net.Conn) (CoordinatorClient, error) { cl := psqlclient.NewPsqlClient(nconn, port.DefaultRouterPortType, "") @@ -1277,6 +1294,6 @@ func (qc *qdbCoordinator) AlterDistributionDetach(ctx context.Context, id string }) } -func (qc *qdbCoordinator) GetShardInfo(ctx context.Context, shardID string) (*datashards.DataShard, error) { +func (qc *qdbCoordinator) GetShard(ctx context.Context, shardID string) (*datashards.DataShard, error) { panic("implement or delete me") } diff --git a/coordinator/provider/shards.go b/coordinator/provider/shards.go index 6534cdb5d..69adf69c3 100644 --- a/coordinator/provider/shards.go +++ b/coordinator/provider/shards.go @@ -3,7 +3,6 @@ package provider import ( "context" - "github.com/pg-sharding/spqr/pkg/config" routerproto "github.com/pg-sharding/spqr/pkg/protos" "github.com/pg-sharding/spqr/pkg/shard" "github.com/pg-sharding/spqr/pkg/txstatus" @@ -31,9 +30,7 @@ var _ protos.ShardServiceServer = &ShardServer{} func (s *ShardServer) AddDataShard(ctx context.Context, request *protos.AddShardRequest) (*protos.AddShardReply, error) { newShard := request.GetShard() - if err := s.impl.AddDataShard(ctx, datashards.NewDataShard(newShard.Id, &config.Shard{ - Hosts: newShard.Hosts, - })); err != nil { + if err := s.impl.AddDataShard(ctx, datashards.DataShardFromProto(newShard)); err != nil { return nil, err } @@ -54,11 +51,8 @@ func (s *ShardServer) ListShards(ctx context.Context, _ *protos.ListShardsReques protoShards := make([]*protos.Shard, 0, len(shardList)) - for _, shard := range shardList { - protoShards = append(protoShards, &protos.Shard{ - Hosts: shard.Cfg.Hosts, - Id: shard.ID, - }) + for _, sh := range shardList { + protoShards = append(protoShards, datashards.DataShardToProto(sh)) } return &protos.ListShardsReply{ @@ -67,17 +61,14 @@ func (s *ShardServer) ListShards(ctx context.Context, _ *protos.ListShardsReques } // TODO : unit tests -func (s *ShardServer) GetShardInfo(ctx context.Context, shardRequest *protos.ShardRequest) (*protos.ShardInfoReply, error) { - shardInfo, err := s.impl.GetShardInfo(ctx, shardRequest.Id) +func (s *ShardServer) GetShard(ctx context.Context, shardRequest *protos.ShardRequest) (*protos.ShardReply, error) { + sh, err := s.impl.GetShard(ctx, shardRequest.Id) if err != nil { return nil, err } - return &protos.ShardInfoReply{ - ShardInfo: &protos.ShardInfo{ - Hosts: shardInfo.Cfg.Hosts, - Id: shardInfo.ID, - }, + return &protos.ShardReply{ + Shard: datashards.DataShardToProto(sh), }, nil } diff --git a/coordinator/provider/tasks.go b/coordinator/provider/tasks.go new file mode 100644 index 000000000..af78c712a --- /dev/null +++ b/coordinator/provider/tasks.go @@ -0,0 +1,39 @@ +package provider + +import ( + "context" + "github.com/pg-sharding/spqr/coordinator" + "github.com/pg-sharding/spqr/pkg/models/tasks" + protos "github.com/pg-sharding/spqr/pkg/protos" +) + +type TasksServer struct { + protos.UnimplementedTasksServiceServer + + impl coordinator.Coordinator +} + +func NewTasksServer(impl coordinator.Coordinator) *TasksServer { + return &TasksServer{ + impl: impl, + } +} + +var _ protos.TasksServiceServer = &TasksServer{} + +func (t TasksServer) GetTaskGroup(ctx context.Context, _ *protos.GetTaskGroupRequest) (*protos.GetTaskGroupReply, error) { + group, err := t.impl.GetTaskGroup(ctx) + if err != nil { + return nil, err + } + return &protos.GetTaskGroupReply{TaskGroup: tasks.TaskGroupToProto(group)}, nil +} + +func (t TasksServer) WriteTaskGroup(ctx context.Context, request *protos.WriteTaskGroupRequest) (*protos.WriteTaskGroupReply, error) { + err := t.impl.WriteTaskGroup(ctx, tasks.TaskGroupFromProto(request.TaskGroup)) + return &protos.WriteTaskGroupReply{}, err +} + +func (t TasksServer) RemoveTaskGroup(ctx context.Context, _ *protos.RemoveTaskGroupRequest) (*protos.RemoveTaskGroupReply, error) { + return &protos.RemoveTaskGroupReply{}, t.impl.RemoveTaskGroup(ctx) +} diff --git a/docker/shard/Dockerfile b/docker/shard/Dockerfile index 9174aee23..6eea78863 100644 --- a/docker/shard/Dockerfile +++ b/docker/shard/Dockerfile @@ -25,6 +25,7 @@ RUN curl https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - && \ sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/pgdg.list' RUN apt-get update && apt-get install -y --no-install-recommends \ + git \ sudo postgresql-$POSTGRES_VERSION \ build-essential \ cmake \ @@ -36,10 +37,15 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libpq-dev \ vim \ postgresql-common \ - postgresql-server-dev-$POSTGRES_VERSION + postgresql-server-dev-$POSTGRES_VERSION \ + postgresql-${POSTGRES_VERSION}-pg-stat-kcache COPY ./docker/shard/bin/ /usr/local/bin/ RUN chmod a+x /usr/local/bin/entrypoint.sh +RUN git clone https://github.com/munakoiso/pg_comment_stats.git && \ + cd pg_comment_stats && \ + make && make install + ENTRYPOINT /usr/local/bin/entrypoint.sh $POSTGRES_VERSION diff --git a/docker/shard/bin/setup b/docker/shard/bin/setup index 342035e01..66caaa4d5 100755 --- a/docker/shard/bin/setup +++ b/docker/shard/bin/setup @@ -16,6 +16,7 @@ cat >> /var/lib/postgresql/$POSTGRES_VERSION/main/postgresql.conf <<-EOF listen_addresses = '*' port = 6432 max_prepared_transactions = 5 +shared_preload_libraries = 'pg_stat_statements,pg_stat_kcache,pg_comment_stats' EOF sudo -u postgres /usr/lib/postgresql/$POSTGRES_VERSION/bin/pg_ctl -D /var/lib/postgresql/$POSTGRES_VERSION/main/ start @@ -44,13 +45,12 @@ sudo -u postgres psql -p 6432 -h localhost -U postgres -c "CREATE DATABASE $POST exit 1 } - -# # Create extension -# sudo -u postgres psql -p 6432 -h localhost -U postgres -c "CREATE EXTENSION IF NOT EXISTS postgres_fdw; CREATE EXTENSION IF NOT EXISTS dblink" -d db1 >> $SETUP_LOG 2>&1 || { -# echo "ERROR: users creation failed, examine the log" -# cat "$SETUP_LOG" -# exit 1 -# } + # Create extension + sudo -u postgres psql -p 6432 -h localhost -U postgres -c "CREATE EXTENSION IF NOT EXISTS pg_stat_statements; CREATE EXTENSION IF NOT EXISTS pg_stat_kcache; CREATE EXTENSION IF NOT EXISTS pg_comment_stats;" -d $POSTGRES_DB >> $SETUP_LOG 2>&1 || { + echo "ERROR: users creation failed, examine the log" + cat "$SETUP_LOG" + exit 1 + } # # Grant permissions # sudo -u postgres psql -p 6432 -h localhost -U postgres -c "ALTER ROLE $POSTGRES_USER SUPERUSER" -d db1 >> $SETUP_LOG 2>&1 || { diff --git a/pkg/config/balancer.go b/pkg/config/balancer.go new file mode 100644 index 000000000..acd06c150 --- /dev/null +++ b/pkg/config/balancer.go @@ -0,0 +1,75 @@ +package config + +import ( + "encoding/json" + "fmt" + "log" + "os" + "strings" + + "github.com/BurntSushi/toml" + "gopkg.in/yaml.v2" +) + +const defaultBalancerTimeout = 60 + +type Balancer struct { + LogLevel string `json:"log_level" toml:"log_level" yaml:"log_level"` // TODO usage + + CoordinatorAddress string `json:"coordinator_address" toml:"coordinator_address" yaml:"coordinator_address"` + + // TODO set default values (probably the type needs to be de-exported) + CpuThreshold float64 `json:"cpu_threshold" yaml:"cpu_threshold" toml:"cpu_threshold"` + SpaceThreshold float64 `json:"space_threshold" yaml:"space_threshold" toml:"space_threshold"` + + StatIntervalSec int `json:"stat_interval_sec" yaml:"stat_interval_sec" toml:"stat_interval_sec"` + + MaxMoveCount int `json:"max_move_count" yaml:"max_move_count" toml:"max_move_count"` + KeysPerMove int `json:"keys_per_move" yaml:"keys_per_move" toml:"keys_per_move"` + + TimeoutSec int `json:"timeout" yaml:"timeout" toml:"timeout"` +} + +var cfgBalancer Balancer + +func LoadBalancerCfg(cfgPath string) error { + file, err := os.Open(cfgPath) + if err != nil { + return err + } + defer func() { _ = file.Close() }() + + if err := initBalancerConfig(file, cfgPath); err != nil { + return err + } + + if cfgBalancer.TimeoutSec == 0 { + cfgBalancer.TimeoutSec = defaultBalancerTimeout + } + + configBytes, err := json.MarshalIndent(cfgBalancer, "", " ") + if err != nil { + return err + } + + log.Println("Running config:", string(configBytes)) + return nil +} + +func initBalancerConfig(file *os.File, filepath string) error { + if strings.HasSuffix(filepath, ".toml") { + _, err := toml.NewDecoder(file).Decode(&cfgBalancer) + return err + } + if strings.HasSuffix(filepath, ".yaml") { + return yaml.NewDecoder(file).Decode(&cfgBalancer) + } + if strings.HasSuffix(filepath, ".json") { + return json.NewDecoder(file).Decode(&cfgBalancer) + } + return fmt.Errorf("unknown config format type: %s. Use .toml, .yaml or .json suffix in filename", filepath) +} + +func BalancerConfig() *Balancer { + return &cfgBalancer +} diff --git a/pkg/coord/adapter.go b/pkg/coord/adapter.go index 83e4fa436..cdb4b7c17 100644 --- a/pkg/coord/adapter.go +++ b/pkg/coord/adapter.go @@ -2,6 +2,7 @@ package coord import ( "context" + "github.com/pg-sharding/spqr/pkg/models/tasks" "github.com/pg-sharding/spqr/pkg/config" "github.com/pg-sharding/spqr/pkg/meta" @@ -298,12 +299,12 @@ func (a *Adapter) ListShards(ctx context.Context) ([]*datashards.DataShard, erro } // TODO : unit tests -func (a *Adapter) GetShardInfo(ctx context.Context, shardID string) (*datashards.DataShard, error) { +func (a *Adapter) GetShard(ctx context.Context, shardID string) (*datashards.DataShard, error) { c := proto.NewShardServiceClient(a.conn) - resp, err := c.GetShardInfo(ctx, &proto.ShardRequest{Id: shardID}) + resp, err := c.GetShard(ctx, &proto.ShardRequest{Id: shardID}) return &datashards.DataShard{ - ID: resp.ShardInfo.Id, - Cfg: &config.Shard{Hosts: resp.ShardInfo.Hosts}, + ID: resp.Shard.Id, + Cfg: &config.Shard{Hosts: resp.Shard.Hosts}, }, err } @@ -402,6 +403,29 @@ func (a *Adapter) GetRelationDistribution(ctx context.Context, id string) (*dist return distributions.DistributionFromProto(resp.Distribution), nil } +func (a *Adapter) GetTaskGroup(ctx context.Context) (*tasks.TaskGroup, error) { + tasksService := proto.NewTasksServiceClient(a.conn) + res, err := tasksService.GetTaskGroup(ctx, &proto.GetTaskGroupRequest{}) + if err != nil { + return nil, err + } + return tasks.TaskGroupFromProto(res.TaskGroup), nil +} + +func (a *Adapter) WriteTaskGroup(ctx context.Context, taskGroup *tasks.TaskGroup) error { + tasksService := proto.NewTasksServiceClient(a.conn) + _, err := tasksService.WriteTaskGroup(ctx, &proto.WriteTaskGroupRequest{ + TaskGroup: tasks.TaskGroupToProto(taskGroup), + }) + return err +} + +func (a *Adapter) RemoveTaskGroup(ctx context.Context) error { + tasksService := proto.NewTasksServiceClient(a.conn) + _, err := tasksService.RemoveTaskGroup(ctx, &proto.RemoveTaskGroupRequest{}) + return err +} + // TODO : unit tests func (a *Adapter) UpdateCoordinator(ctx context.Context, address string) error { c := proto.NewTopologyServiceClient(a.conn) diff --git a/pkg/coord/local/clocal.go b/pkg/coord/local/clocal.go index 11a6b661d..02bb115fb 100644 --- a/pkg/coord/local/clocal.go +++ b/pkg/coord/local/clocal.go @@ -3,6 +3,7 @@ package local import ( "context" "fmt" + "github.com/pg-sharding/spqr/pkg/models/tasks" "math/rand" "sync" @@ -32,6 +33,22 @@ type LocalCoordinator struct { qdb qdb.QDB } +func (lc *LocalCoordinator) GetTaskGroup(ctx context.Context) (*tasks.TaskGroup, error) { + group, err := lc.qdb.GetTaskGroup(ctx) + if err != nil { + return nil, err + } + return tasks.TaskGroupFromDb(group), nil +} + +func (lc *LocalCoordinator) WriteTaskGroup(ctx context.Context, taskGroup *tasks.TaskGroup) error { + return lc.qdb.WriteTaskGroup(ctx, tasks.TaskGroupToDb(taskGroup)) +} + +func (lc *LocalCoordinator) RemoveTaskGroup(ctx context.Context) error { + return lc.qdb.RemoveTaskGroup(ctx) +} + // TODO : unit tests func (lc *LocalCoordinator) ListDistributions(ctx context.Context) ([]*distributions.Distribution, error) { lc.mu.Lock() @@ -452,7 +469,7 @@ func (qr *LocalCoordinator) GetCoordinator(ctx context.Context) (string, error) return addr, err } -func (qr *LocalCoordinator) GetShardInfo(ctx context.Context, shardID string) (*datashards.DataShard, error) { +func (qr *LocalCoordinator) GetShard(ctx context.Context, shardID string) (*datashards.DataShard, error) { return nil, ErrNotCoordinator } diff --git a/pkg/meta/meta.go b/pkg/meta/meta.go index 2ee69953e..1d6f10956 100644 --- a/pkg/meta/meta.go +++ b/pkg/meta/meta.go @@ -9,6 +9,7 @@ import ( "github.com/pg-sharding/spqr/pkg/connectiterator" "github.com/pg-sharding/spqr/pkg/models/distributions" "github.com/pg-sharding/spqr/pkg/models/spqrerror" + "github.com/pg-sharding/spqr/pkg/models/tasks" "github.com/pg-sharding/spqr/pkg/models/topology" "github.com/pg-sharding/spqr/pkg/pool" "github.com/pg-sharding/spqr/pkg/shard" @@ -26,6 +27,7 @@ type EntityMgr interface { topology.RouterMgr datashards.ShardsMgr distributions.DistributionMgr + tasks.TaskMgr ShareKeyRange(id string) error diff --git a/pkg/models/datashards/datashard.go b/pkg/models/datashards/datashard.go index 728610967..31cdb8dcc 100644 --- a/pkg/models/datashards/datashard.go +++ b/pkg/models/datashards/datashard.go @@ -1,6 +1,9 @@ package datashards -import "github.com/pg-sharding/spqr/pkg/config" +import ( + "github.com/pg-sharding/spqr/pkg/config" + proto "github.com/pg-sharding/spqr/pkg/protos" +) type DataShard struct { ID string @@ -13,3 +16,17 @@ func NewDataShard(name string, cfg *config.Shard) *DataShard { Cfg: cfg, } } + +func DataShardToProto(shard *DataShard) *proto.Shard { + return &proto.Shard{ + Hosts: shard.Cfg.Hosts, + Id: shard.ID, + } +} + +func DataShardFromProto(shard *proto.Shard) *DataShard { + return NewDataShard(shard.Id, &config.Shard{ + Hosts: shard.Hosts, + Type: config.DataShard, + }) +} diff --git a/pkg/models/datashards/datashardmgr.go b/pkg/models/datashards/datashardmgr.go index 265666d33..95b4f345a 100644 --- a/pkg/models/datashards/datashardmgr.go +++ b/pkg/models/datashards/datashardmgr.go @@ -8,6 +8,6 @@ type ShardsMgr interface { AddDataShard(ctx context.Context, shard *DataShard) error AddWorldShard(ctx context.Context, shard *DataShard) error ListShards(ctx context.Context) ([]*DataShard, error) - GetShardInfo(ctx context.Context, shardID string) (*DataShard, error) + GetShard(ctx context.Context, shardID string) (*DataShard, error) DropShard(ctx context.Context, id string) error } diff --git a/pkg/models/tasks/taskmgr.go b/pkg/models/tasks/taskmgr.go new file mode 100644 index 000000000..92a5b1205 --- /dev/null +++ b/pkg/models/tasks/taskmgr.go @@ -0,0 +1,9 @@ +package tasks + +import "context" + +type TaskMgr interface { + GetTaskGroup(ctx context.Context) (*TaskGroup, error) + WriteTaskGroup(ctx context.Context, taskGroup *TaskGroup) error + RemoveTaskGroup(ctx context.Context) error +} diff --git a/pkg/models/tasks/tasks.go b/pkg/models/tasks/tasks.go new file mode 100644 index 000000000..8febab1c1 --- /dev/null +++ b/pkg/models/tasks/tasks.go @@ -0,0 +1,189 @@ +package tasks + +import ( + protos "github.com/pg-sharding/spqr/pkg/protos" + "github.com/pg-sharding/spqr/qdb" +) + +type Task struct { + ShardFromId string + ShardToId string + KrIdFrom string + KrIdTo string + Bound []byte + KrIdTemp string + State TaskState +} + +type TaskState int + +const ( + TaskPlanned = iota + TaskSplit + TaskMoved +) + +type JoinType int + +const ( + JoinNone = iota + JoinLeft + JoinRight +) + +type TaskGroup struct { + Tasks []*Task + JoinType JoinType +} + +func TaskGroupToProto(group *TaskGroup) *protos.TaskGroup { + return &protos.TaskGroup{ + Tasks: func() []*protos.Task { + res := make([]*protos.Task, len(group.Tasks)) + for i, t := range group.Tasks { + res[i] = TaskToProto(t) + } + return res + }(), + JoinType: JoinTypeToProto(group.JoinType), + } +} + +func TaskToProto(task *Task) *protos.Task { + return &protos.Task{ + ShardIdFrom: task.ShardFromId, + ShardIdTo: task.ShardToId, + KeyRangeIdFrom: task.KrIdFrom, + KeyRangeIdTo: task.KrIdTo, + KeyRangeIdTemp: task.KrIdTemp, + Bound: task.Bound, + Status: TaskStateToProto(task.State), + } +} + +func TaskStateToProto(state TaskState) protos.TaskStatus { + switch state { + case TaskPlanned: + return protos.TaskStatus_Planned + case TaskSplit: + return protos.TaskStatus_Split + case TaskMoved: + return protos.TaskStatus_Moved + default: + panic("incorrect task state") + } +} + +func JoinTypeToProto(t JoinType) protos.JoinType { + switch t { + case JoinNone: + return protos.JoinType_JoinNone + case JoinLeft: + return protos.JoinType_JoinLeft + case JoinRight: + return protos.JoinType_JoinRight + default: + panic("incorrect join type") + } +} + +func TaskGroupFromProto(group *protos.TaskGroup) *TaskGroup { + return &TaskGroup{ + Tasks: func() []*Task { + res := make([]*Task, len(group.Tasks)) + for i, t := range group.Tasks { + res[i] = TaskFromProto(t) + } + return res + }(), + JoinType: JoinTypeFromProto(group.JoinType), + } +} + +func TaskFromProto(task *protos.Task) *Task { + return &Task{ + ShardFromId: task.ShardIdFrom, + ShardToId: task.ShardIdTo, + KrIdFrom: task.KeyRangeIdFrom, + KrIdTo: task.KeyRangeIdTo, + KrIdTemp: task.KeyRangeIdTemp, + Bound: task.Bound, + State: TaskStateFromProto(task.Status), + } +} + +func TaskStateFromProto(state protos.TaskStatus) TaskState { + switch state { + case protos.TaskStatus_Planned: + return TaskPlanned + case protos.TaskStatus_Split: + return TaskSplit + case protos.TaskStatus_Moved: + return TaskMoved + default: + panic("incorrect task state") + } +} + +func JoinTypeFromProto(t protos.JoinType) JoinType { + switch t { + case protos.JoinType_JoinNone: + return JoinNone + case protos.JoinType_JoinLeft: + return JoinLeft + case protos.JoinType_JoinRight: + return JoinRight + default: + panic("incorrect join type") + } +} + +func TaskGroupToDb(group *TaskGroup) *qdb.TaskGroup { + return &qdb.TaskGroup{ + Tasks: func() []*qdb.Task { + res := make([]*qdb.Task, len(group.Tasks)) + for i, task := range group.Tasks { + res[i] = TaskToDb(task) + } + return res + }(), + JoinType: int(group.JoinType), + } +} + +func TaskToDb(task *Task) *qdb.Task { + return &qdb.Task{ + ShardFromId: task.ShardFromId, + ShardToId: task.ShardToId, + KrIdFrom: task.KrIdFrom, + KrIdTo: task.KrIdTo, + KrIdTemp: task.KrIdTemp, + Bound: task.Bound, + State: int(task.State), + } +} + +func TaskGroupFromDb(group *qdb.TaskGroup) *TaskGroup { + return &TaskGroup{ + Tasks: func() []*Task { + res := make([]*Task, len(group.Tasks)) + for i, task := range group.Tasks { + res[i] = TaskFromDb(task) + } + return res + }(), + JoinType: JoinType(group.JoinType), + } +} + +func TaskFromDb(task *qdb.Task) *Task { + return &Task{ + ShardFromId: task.ShardFromId, + ShardToId: task.ShardToId, + KrIdFrom: task.KrIdFrom, + KrIdTo: task.KrIdTo, + KrIdTemp: task.KrIdTemp, + Bound: task.Bound, + State: TaskState(task.State), + } +} diff --git a/pkg/protos/shard.pb.go b/pkg/protos/shard.pb.go index 2789d0925..a279fd968 100644 --- a/pkg/protos/shard.pb.go +++ b/pkg/protos/shard.pb.go @@ -130,16 +130,16 @@ func (x *ShardInfo) GetHosts() []string { return nil } -type ShardInfoReply struct { +type ShardReply struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - ShardInfo *ShardInfo `protobuf:"bytes,1,opt,name=shard_info,json=shardInfo,proto3" json:"shard_info,omitempty"` + Shard *Shard `protobuf:"bytes,1,opt,name=shard,proto3" json:"shard,omitempty"` } -func (x *ShardInfoReply) Reset() { - *x = ShardInfoReply{} +func (x *ShardReply) Reset() { + *x = ShardReply{} if protoimpl.UnsafeEnabled { mi := &file_protos_shard_proto_msgTypes[2] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -147,13 +147,13 @@ func (x *ShardInfoReply) Reset() { } } -func (x *ShardInfoReply) String() string { +func (x *ShardReply) String() string { return protoimpl.X.MessageStringOf(x) } -func (*ShardInfoReply) ProtoMessage() {} +func (*ShardReply) ProtoMessage() {} -func (x *ShardInfoReply) ProtoReflect() protoreflect.Message { +func (x *ShardReply) ProtoReflect() protoreflect.Message { mi := &file_protos_shard_proto_msgTypes[2] if protoimpl.UnsafeEnabled && x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) @@ -165,14 +165,14 @@ func (x *ShardInfoReply) ProtoReflect() protoreflect.Message { return mi.MessageOf(x) } -// Deprecated: Use ShardInfoReply.ProtoReflect.Descriptor instead. -func (*ShardInfoReply) Descriptor() ([]byte, []int) { +// Deprecated: Use ShardReply.ProtoReflect.Descriptor instead. +func (*ShardReply) Descriptor() ([]byte, []int) { return file_protos_shard_proto_rawDescGZIP(), []int{2} } -func (x *ShardInfoReply) GetShardInfo() *ShardInfo { +func (x *ShardReply) GetShard() *Shard { if x != nil { - return x.ShardInfo + return x.Shard } return nil } @@ -451,45 +451,43 @@ var file_protos_shard_proto_rawDesc = []byte{ 0x28, 0x09, 0x52, 0x05, 0x68, 0x6f, 0x73, 0x74, 0x73, 0x22, 0x31, 0x0a, 0x09, 0x53, 0x68, 0x61, 0x72, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x68, 0x6f, 0x73, 0x74, 0x73, 0x18, - 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x68, 0x6f, 0x73, 0x74, 0x73, 0x22, 0x40, 0x0a, 0x0e, - 0x53, 0x68, 0x61, 0x72, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x2e, - 0x0a, 0x0a, 0x73, 0x68, 0x61, 0x72, 0x64, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x49, - 0x6e, 0x66, 0x6f, 0x52, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x13, - 0x0a, 0x11, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x68, 0x61, 0x72, 0x64, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x22, 0x1e, 0x0a, 0x0c, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x02, 0x69, 0x64, 0x22, 0x36, 0x0a, 0x0f, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x68, 0x61, 0x72, 0x64, - 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x23, 0x0a, 0x06, 0x73, 0x68, 0x61, 0x72, 0x64, 0x73, - 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x53, 0x68, - 0x61, 0x72, 0x64, 0x52, 0x06, 0x73, 0x68, 0x61, 0x72, 0x64, 0x73, 0x22, 0x34, 0x0a, 0x0f, 0x41, - 0x64, 0x64, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, - 0x0a, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, - 0x73, 0x70, 0x71, 0x72, 0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x05, 0x73, 0x68, 0x61, 0x72, - 0x64, 0x22, 0x0f, 0x0a, 0x0d, 0x41, 0x64, 0x64, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x70, - 0x6c, 0x79, 0x22, 0x39, 0x0a, 0x14, 0x41, 0x64, 0x64, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x53, 0x68, - 0x61, 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, 0x0a, 0x05, 0x73, 0x68, + 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x68, 0x6f, 0x73, 0x74, 0x73, 0x22, 0x2f, 0x0a, 0x0a, + 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x21, 0x0a, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x73, 0x70, 0x71, 0x72, - 0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x32, 0x8c, 0x02, - 0x0a, 0x0c, 0x53, 0x68, 0x61, 0x72, 0x64, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x3e, - 0x0a, 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x68, 0x61, 0x72, 0x64, 0x73, 0x12, 0x17, 0x2e, 0x73, - 0x70, 0x71, 0x72, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x68, 0x61, 0x72, 0x64, 0x73, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x4c, 0x69, 0x73, - 0x74, 0x53, 0x68, 0x61, 0x72, 0x64, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x3c, - 0x0a, 0x0c, 0x41, 0x64, 0x64, 0x44, 0x61, 0x74, 0x61, 0x53, 0x68, 0x61, 0x72, 0x64, 0x12, 0x15, - 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x41, 0x64, 0x64, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x41, 0x64, 0x64, - 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x42, 0x0a, 0x0d, - 0x41, 0x64, 0x64, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x53, 0x68, 0x61, 0x72, 0x64, 0x12, 0x1a, 0x2e, - 0x73, 0x70, 0x71, 0x72, 0x2e, 0x41, 0x64, 0x64, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x53, 0x68, 0x61, - 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x73, 0x70, 0x71, 0x72, - 0x2e, 0x41, 0x64, 0x64, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, - 0x12, 0x3a, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x53, 0x68, 0x61, 0x72, 0x64, 0x49, 0x6e, 0x66, 0x6f, - 0x12, 0x12, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x53, 0x68, 0x61, 0x72, - 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0c, 0x5a, 0x0a, - 0x73, 0x70, 0x71, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x22, 0x13, 0x0a, + 0x11, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x68, 0x61, 0x72, 0x64, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x22, 0x1e, 0x0a, 0x0c, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, + 0x69, 0x64, 0x22, 0x36, 0x0a, 0x0f, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x68, 0x61, 0x72, 0x64, 0x73, + 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x23, 0x0a, 0x06, 0x73, 0x68, 0x61, 0x72, 0x64, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x53, 0x68, 0x61, + 0x72, 0x64, 0x52, 0x06, 0x73, 0x68, 0x61, 0x72, 0x64, 0x73, 0x22, 0x34, 0x0a, 0x0f, 0x41, 0x64, + 0x64, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, 0x0a, + 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x73, + 0x70, 0x71, 0x72, 0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, + 0x22, 0x0f, 0x0a, 0x0d, 0x41, 0x64, 0x64, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x22, 0x39, 0x0a, 0x14, 0x41, 0x64, 0x64, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x53, 0x68, 0x61, + 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, 0x0a, 0x05, 0x73, 0x68, 0x61, + 0x72, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, + 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x05, 0x73, 0x68, 0x61, 0x72, 0x64, 0x32, 0x84, 0x02, 0x0a, + 0x0c, 0x53, 0x68, 0x61, 0x72, 0x64, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x3e, 0x0a, + 0x0a, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x68, 0x61, 0x72, 0x64, 0x73, 0x12, 0x17, 0x2e, 0x73, 0x70, + 0x71, 0x72, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x53, 0x68, 0x61, 0x72, 0x64, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x4c, 0x69, 0x73, 0x74, + 0x53, 0x68, 0x61, 0x72, 0x64, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x3c, 0x0a, + 0x0c, 0x41, 0x64, 0x64, 0x44, 0x61, 0x74, 0x61, 0x53, 0x68, 0x61, 0x72, 0x64, 0x12, 0x15, 0x2e, + 0x73, 0x70, 0x71, 0x72, 0x2e, 0x41, 0x64, 0x64, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x41, 0x64, 0x64, 0x53, + 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x42, 0x0a, 0x0d, 0x41, + 0x64, 0x64, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x53, 0x68, 0x61, 0x72, 0x64, 0x12, 0x1a, 0x2e, 0x73, + 0x70, 0x71, 0x72, 0x2e, 0x41, 0x64, 0x64, 0x57, 0x6f, 0x72, 0x6c, 0x64, 0x53, 0x68, 0x61, 0x72, + 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, + 0x41, 0x64, 0x64, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, + 0x32, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, 0x68, 0x61, 0x72, 0x64, 0x12, 0x12, 0x2e, 0x73, 0x70, + 0x71, 0x72, 0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x10, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x53, 0x68, 0x61, 0x72, 0x64, 0x52, 0x65, 0x70, 0x6c, + 0x79, 0x22, 0x00, 0x42, 0x0c, 0x5a, 0x0a, 0x73, 0x70, 0x71, 0x72, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -508,7 +506,7 @@ var file_protos_shard_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_protos_shard_proto_goTypes = []interface{}{ (*Shard)(nil), // 0: spqr.Shard (*ShardInfo)(nil), // 1: spqr.ShardInfo - (*ShardInfoReply)(nil), // 2: spqr.ShardInfoReply + (*ShardReply)(nil), // 2: spqr.ShardReply (*ListShardsRequest)(nil), // 3: spqr.ListShardsRequest (*ShardRequest)(nil), // 4: spqr.ShardRequest (*ListShardsReply)(nil), // 5: spqr.ListShardsReply @@ -517,18 +515,18 @@ var file_protos_shard_proto_goTypes = []interface{}{ (*AddWorldShardRequest)(nil), // 8: spqr.AddWorldShardRequest } var file_protos_shard_proto_depIdxs = []int32{ - 1, // 0: spqr.ShardInfoReply.shard_info:type_name -> spqr.ShardInfo + 0, // 0: spqr.ShardReply.shard:type_name -> spqr.Shard 0, // 1: spqr.ListShardsReply.shards:type_name -> spqr.Shard 0, // 2: spqr.AddShardRequest.shard:type_name -> spqr.Shard 0, // 3: spqr.AddWorldShardRequest.shard:type_name -> spqr.Shard 3, // 4: spqr.ShardService.ListShards:input_type -> spqr.ListShardsRequest 6, // 5: spqr.ShardService.AddDataShard:input_type -> spqr.AddShardRequest 8, // 6: spqr.ShardService.AddWorldShard:input_type -> spqr.AddWorldShardRequest - 4, // 7: spqr.ShardService.GetShardInfo:input_type -> spqr.ShardRequest + 4, // 7: spqr.ShardService.GetShard:input_type -> spqr.ShardRequest 5, // 8: spqr.ShardService.ListShards:output_type -> spqr.ListShardsReply 7, // 9: spqr.ShardService.AddDataShard:output_type -> spqr.AddShardReply 7, // 10: spqr.ShardService.AddWorldShard:output_type -> spqr.AddShardReply - 2, // 11: spqr.ShardService.GetShardInfo:output_type -> spqr.ShardInfoReply + 2, // 11: spqr.ShardService.GetShard:output_type -> spqr.ShardReply 8, // [8:12] is the sub-list for method output_type 4, // [4:8] is the sub-list for method input_type 4, // [4:4] is the sub-list for extension type_name @@ -567,7 +565,7 @@ func file_protos_shard_proto_init() { } } file_protos_shard_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*ShardInfoReply); i { + switch v := v.(*ShardReply); i { case 0: return &v.state case 1: diff --git a/pkg/protos/shard_grpc.pb.go b/pkg/protos/shard_grpc.pb.go index 52c2d55c1..f65a69ccf 100644 --- a/pkg/protos/shard_grpc.pb.go +++ b/pkg/protos/shard_grpc.pb.go @@ -22,7 +22,7 @@ const ( ShardService_ListShards_FullMethodName = "/spqr.ShardService/ListShards" ShardService_AddDataShard_FullMethodName = "/spqr.ShardService/AddDataShard" ShardService_AddWorldShard_FullMethodName = "/spqr.ShardService/AddWorldShard" - ShardService_GetShardInfo_FullMethodName = "/spqr.ShardService/GetShardInfo" + ShardService_GetShard_FullMethodName = "/spqr.ShardService/GetShard" ) // ShardServiceClient is the client API for ShardService service. @@ -32,7 +32,7 @@ type ShardServiceClient interface { ListShards(ctx context.Context, in *ListShardsRequest, opts ...grpc.CallOption) (*ListShardsReply, error) AddDataShard(ctx context.Context, in *AddShardRequest, opts ...grpc.CallOption) (*AddShardReply, error) AddWorldShard(ctx context.Context, in *AddWorldShardRequest, opts ...grpc.CallOption) (*AddShardReply, error) - GetShardInfo(ctx context.Context, in *ShardRequest, opts ...grpc.CallOption) (*ShardInfoReply, error) + GetShard(ctx context.Context, in *ShardRequest, opts ...grpc.CallOption) (*ShardReply, error) } type shardServiceClient struct { @@ -70,9 +70,9 @@ func (c *shardServiceClient) AddWorldShard(ctx context.Context, in *AddWorldShar return out, nil } -func (c *shardServiceClient) GetShardInfo(ctx context.Context, in *ShardRequest, opts ...grpc.CallOption) (*ShardInfoReply, error) { - out := new(ShardInfoReply) - err := c.cc.Invoke(ctx, ShardService_GetShardInfo_FullMethodName, in, out, opts...) +func (c *shardServiceClient) GetShard(ctx context.Context, in *ShardRequest, opts ...grpc.CallOption) (*ShardReply, error) { + out := new(ShardReply) + err := c.cc.Invoke(ctx, ShardService_GetShard_FullMethodName, in, out, opts...) if err != nil { return nil, err } @@ -86,7 +86,7 @@ type ShardServiceServer interface { ListShards(context.Context, *ListShardsRequest) (*ListShardsReply, error) AddDataShard(context.Context, *AddShardRequest) (*AddShardReply, error) AddWorldShard(context.Context, *AddWorldShardRequest) (*AddShardReply, error) - GetShardInfo(context.Context, *ShardRequest) (*ShardInfoReply, error) + GetShard(context.Context, *ShardRequest) (*ShardReply, error) mustEmbedUnimplementedShardServiceServer() } @@ -103,8 +103,8 @@ func (UnimplementedShardServiceServer) AddDataShard(context.Context, *AddShardRe func (UnimplementedShardServiceServer) AddWorldShard(context.Context, *AddWorldShardRequest) (*AddShardReply, error) { return nil, status.Errorf(codes.Unimplemented, "method AddWorldShard not implemented") } -func (UnimplementedShardServiceServer) GetShardInfo(context.Context, *ShardRequest) (*ShardInfoReply, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetShardInfo not implemented") +func (UnimplementedShardServiceServer) GetShard(context.Context, *ShardRequest) (*ShardReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetShard not implemented") } func (UnimplementedShardServiceServer) mustEmbedUnimplementedShardServiceServer() {} @@ -173,20 +173,20 @@ func _ShardService_AddWorldShard_Handler(srv interface{}, ctx context.Context, d return interceptor(ctx, in, info, handler) } -func _ShardService_GetShardInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { +func _ShardService_GetShard_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(ShardRequest) if err := dec(in); err != nil { return nil, err } if interceptor == nil { - return srv.(ShardServiceServer).GetShardInfo(ctx, in) + return srv.(ShardServiceServer).GetShard(ctx, in) } info := &grpc.UnaryServerInfo{ Server: srv, - FullMethod: ShardService_GetShardInfo_FullMethodName, + FullMethod: ShardService_GetShard_FullMethodName, } handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(ShardServiceServer).GetShardInfo(ctx, req.(*ShardRequest)) + return srv.(ShardServiceServer).GetShard(ctx, req.(*ShardRequest)) } return interceptor(ctx, in, info, handler) } @@ -211,8 +211,8 @@ var ShardService_ServiceDesc = grpc.ServiceDesc{ Handler: _ShardService_AddWorldShard_Handler, }, { - MethodName: "GetShardInfo", - Handler: _ShardService_GetShardInfo_Handler, + MethodName: "GetShard", + Handler: _ShardService_GetShard_Handler, }, }, Streams: []grpc.StreamDesc{}, diff --git a/pkg/protos/tasks.pb.go b/pkg/protos/tasks.pb.go new file mode 100644 index 000000000..2ab93d603 --- /dev/null +++ b/pkg/protos/tasks.pb.go @@ -0,0 +1,750 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.32.0 +// protoc v3.21.12 +// source: protos/tasks.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type TaskStatus int32 + +const ( + TaskStatus_Planned TaskStatus = 0 + TaskStatus_Split TaskStatus = 1 + TaskStatus_Moved TaskStatus = 2 +) + +// Enum value maps for TaskStatus. +var ( + TaskStatus_name = map[int32]string{ + 0: "Planned", + 1: "Split", + 2: "Moved", + } + TaskStatus_value = map[string]int32{ + "Planned": 0, + "Split": 1, + "Moved": 2, + } +) + +func (x TaskStatus) Enum() *TaskStatus { + p := new(TaskStatus) + *p = x + return p +} + +func (x TaskStatus) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (TaskStatus) Descriptor() protoreflect.EnumDescriptor { + return file_protos_tasks_proto_enumTypes[0].Descriptor() +} + +func (TaskStatus) Type() protoreflect.EnumType { + return &file_protos_tasks_proto_enumTypes[0] +} + +func (x TaskStatus) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use TaskStatus.Descriptor instead. +func (TaskStatus) EnumDescriptor() ([]byte, []int) { + return file_protos_tasks_proto_rawDescGZIP(), []int{0} +} + +type JoinType int32 + +const ( + JoinType_JoinNone JoinType = 0 + JoinType_JoinLeft JoinType = 1 + JoinType_JoinRight JoinType = 2 +) + +// Enum value maps for JoinType. +var ( + JoinType_name = map[int32]string{ + 0: "JoinNone", + 1: "JoinLeft", + 2: "JoinRight", + } + JoinType_value = map[string]int32{ + "JoinNone": 0, + "JoinLeft": 1, + "JoinRight": 2, + } +) + +func (x JoinType) Enum() *JoinType { + p := new(JoinType) + *p = x + return p +} + +func (x JoinType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (JoinType) Descriptor() protoreflect.EnumDescriptor { + return file_protos_tasks_proto_enumTypes[1].Descriptor() +} + +func (JoinType) Type() protoreflect.EnumType { + return &file_protos_tasks_proto_enumTypes[1] +} + +func (x JoinType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use JoinType.Descriptor instead. +func (JoinType) EnumDescriptor() ([]byte, []int) { + return file_protos_tasks_proto_rawDescGZIP(), []int{1} +} + +type Task struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ShardIdFrom string `protobuf:"bytes,1,opt,name=shardIdFrom,proto3" json:"shardIdFrom,omitempty"` + ShardIdTo string `protobuf:"bytes,2,opt,name=shardIdTo,proto3" json:"shardIdTo,omitempty"` + KeyRangeIdFrom string `protobuf:"bytes,3,opt,name=keyRangeIdFrom,proto3" json:"keyRangeIdFrom,omitempty"` + KeyRangeIdTo string `protobuf:"bytes,4,opt,name=keyRangeIdTo,proto3" json:"keyRangeIdTo,omitempty"` + KeyRangeIdTemp string `protobuf:"bytes,5,opt,name=keyRangeIdTemp,proto3" json:"keyRangeIdTemp,omitempty"` + Bound []byte `protobuf:"bytes,6,opt,name=bound,proto3" json:"bound,omitempty"` + Status TaskStatus `protobuf:"varint,7,opt,name=status,proto3,enum=spqr.TaskStatus" json:"status,omitempty"` +} + +func (x *Task) Reset() { + *x = Task{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_tasks_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Task) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Task) ProtoMessage() {} + +func (x *Task) ProtoReflect() protoreflect.Message { + mi := &file_protos_tasks_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Task.ProtoReflect.Descriptor instead. +func (*Task) Descriptor() ([]byte, []int) { + return file_protos_tasks_proto_rawDescGZIP(), []int{0} +} + +func (x *Task) GetShardIdFrom() string { + if x != nil { + return x.ShardIdFrom + } + return "" +} + +func (x *Task) GetShardIdTo() string { + if x != nil { + return x.ShardIdTo + } + return "" +} + +func (x *Task) GetKeyRangeIdFrom() string { + if x != nil { + return x.KeyRangeIdFrom + } + return "" +} + +func (x *Task) GetKeyRangeIdTo() string { + if x != nil { + return x.KeyRangeIdTo + } + return "" +} + +func (x *Task) GetKeyRangeIdTemp() string { + if x != nil { + return x.KeyRangeIdTemp + } + return "" +} + +func (x *Task) GetBound() []byte { + if x != nil { + return x.Bound + } + return nil +} + +func (x *Task) GetStatus() TaskStatus { + if x != nil { + return x.Status + } + return TaskStatus_Planned +} + +type TaskGroup struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Tasks []*Task `protobuf:"bytes,1,rep,name=tasks,proto3" json:"tasks,omitempty"` + JoinType JoinType `protobuf:"varint,2,opt,name=joinType,proto3,enum=spqr.JoinType" json:"joinType,omitempty"` +} + +func (x *TaskGroup) Reset() { + *x = TaskGroup{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_tasks_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TaskGroup) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TaskGroup) ProtoMessage() {} + +func (x *TaskGroup) ProtoReflect() protoreflect.Message { + mi := &file_protos_tasks_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TaskGroup.ProtoReflect.Descriptor instead. +func (*TaskGroup) Descriptor() ([]byte, []int) { + return file_protos_tasks_proto_rawDescGZIP(), []int{1} +} + +func (x *TaskGroup) GetTasks() []*Task { + if x != nil { + return x.Tasks + } + return nil +} + +func (x *TaskGroup) GetJoinType() JoinType { + if x != nil { + return x.JoinType + } + return JoinType_JoinNone +} + +type GetTaskGroupRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetTaskGroupRequest) Reset() { + *x = GetTaskGroupRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_tasks_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetTaskGroupRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetTaskGroupRequest) ProtoMessage() {} + +func (x *GetTaskGroupRequest) ProtoReflect() protoreflect.Message { + mi := &file_protos_tasks_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetTaskGroupRequest.ProtoReflect.Descriptor instead. +func (*GetTaskGroupRequest) Descriptor() ([]byte, []int) { + return file_protos_tasks_proto_rawDescGZIP(), []int{2} +} + +type GetTaskGroupReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TaskGroup *TaskGroup `protobuf:"bytes,1,opt,name=taskGroup,proto3" json:"taskGroup,omitempty"` +} + +func (x *GetTaskGroupReply) Reset() { + *x = GetTaskGroupReply{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_tasks_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetTaskGroupReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetTaskGroupReply) ProtoMessage() {} + +func (x *GetTaskGroupReply) ProtoReflect() protoreflect.Message { + mi := &file_protos_tasks_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetTaskGroupReply.ProtoReflect.Descriptor instead. +func (*GetTaskGroupReply) Descriptor() ([]byte, []int) { + return file_protos_tasks_proto_rawDescGZIP(), []int{3} +} + +func (x *GetTaskGroupReply) GetTaskGroup() *TaskGroup { + if x != nil { + return x.TaskGroup + } + return nil +} + +type WriteTaskGroupRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TaskGroup *TaskGroup `protobuf:"bytes,1,opt,name=taskGroup,proto3" json:"taskGroup,omitempty"` +} + +func (x *WriteTaskGroupRequest) Reset() { + *x = WriteTaskGroupRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_tasks_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WriteTaskGroupRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteTaskGroupRequest) ProtoMessage() {} + +func (x *WriteTaskGroupRequest) ProtoReflect() protoreflect.Message { + mi := &file_protos_tasks_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteTaskGroupRequest.ProtoReflect.Descriptor instead. +func (*WriteTaskGroupRequest) Descriptor() ([]byte, []int) { + return file_protos_tasks_proto_rawDescGZIP(), []int{4} +} + +func (x *WriteTaskGroupRequest) GetTaskGroup() *TaskGroup { + if x != nil { + return x.TaskGroup + } + return nil +} + +type WriteTaskGroupReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *WriteTaskGroupReply) Reset() { + *x = WriteTaskGroupReply{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_tasks_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *WriteTaskGroupReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*WriteTaskGroupReply) ProtoMessage() {} + +func (x *WriteTaskGroupReply) ProtoReflect() protoreflect.Message { + mi := &file_protos_tasks_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use WriteTaskGroupReply.ProtoReflect.Descriptor instead. +func (*WriteTaskGroupReply) Descriptor() ([]byte, []int) { + return file_protos_tasks_proto_rawDescGZIP(), []int{5} +} + +type RemoveTaskGroupRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *RemoveTaskGroupRequest) Reset() { + *x = RemoveTaskGroupRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_tasks_proto_msgTypes[6] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RemoveTaskGroupRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RemoveTaskGroupRequest) ProtoMessage() {} + +func (x *RemoveTaskGroupRequest) ProtoReflect() protoreflect.Message { + mi := &file_protos_tasks_proto_msgTypes[6] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RemoveTaskGroupRequest.ProtoReflect.Descriptor instead. +func (*RemoveTaskGroupRequest) Descriptor() ([]byte, []int) { + return file_protos_tasks_proto_rawDescGZIP(), []int{6} +} + +type RemoveTaskGroupReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *RemoveTaskGroupReply) Reset() { + *x = RemoveTaskGroupReply{} + if protoimpl.UnsafeEnabled { + mi := &file_protos_tasks_proto_msgTypes[7] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RemoveTaskGroupReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RemoveTaskGroupReply) ProtoMessage() {} + +func (x *RemoveTaskGroupReply) ProtoReflect() protoreflect.Message { + mi := &file_protos_tasks_proto_msgTypes[7] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RemoveTaskGroupReply.ProtoReflect.Descriptor instead. +func (*RemoveTaskGroupReply) Descriptor() ([]byte, []int) { + return file_protos_tasks_proto_rawDescGZIP(), []int{7} +} + +var File_protos_tasks_proto protoreflect.FileDescriptor + +var file_protos_tasks_proto_rawDesc = []byte{ + 0x0a, 0x12, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x73, 0x70, 0x71, 0x72, 0x22, 0xfa, 0x01, 0x0a, 0x04, 0x54, + 0x61, 0x73, 0x6b, 0x12, 0x20, 0x0a, 0x0b, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x64, 0x46, 0x72, + 0x6f, 0x6d, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0b, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, + 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x12, 0x1c, 0x0a, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, 0x64, + 0x54, 0x6f, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x73, 0x68, 0x61, 0x72, 0x64, 0x49, + 0x64, 0x54, 0x6f, 0x12, 0x26, 0x0a, 0x0e, 0x6b, 0x65, 0x79, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x49, + 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x6b, 0x65, 0x79, + 0x52, 0x61, 0x6e, 0x67, 0x65, 0x49, 0x64, 0x46, 0x72, 0x6f, 0x6d, 0x12, 0x22, 0x0a, 0x0c, 0x6b, + 0x65, 0x79, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x49, 0x64, 0x54, 0x6f, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0c, 0x6b, 0x65, 0x79, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x49, 0x64, 0x54, 0x6f, 0x12, + 0x26, 0x0a, 0x0e, 0x6b, 0x65, 0x79, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x49, 0x64, 0x54, 0x65, 0x6d, + 0x70, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0e, 0x6b, 0x65, 0x79, 0x52, 0x61, 0x6e, 0x67, + 0x65, 0x49, 0x64, 0x54, 0x65, 0x6d, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x62, 0x6f, 0x75, 0x6e, 0x64, + 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x12, 0x28, 0x0a, + 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x10, 0x2e, + 0x73, 0x70, 0x71, 0x72, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, + 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x22, 0x59, 0x0a, 0x09, 0x54, 0x61, 0x73, 0x6b, 0x47, + 0x72, 0x6f, 0x75, 0x70, 0x12, 0x20, 0x0a, 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x18, 0x01, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x52, + 0x05, 0x74, 0x61, 0x73, 0x6b, 0x73, 0x12, 0x2a, 0x0a, 0x08, 0x6a, 0x6f, 0x69, 0x6e, 0x54, 0x79, + 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0e, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, + 0x4a, 0x6f, 0x69, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x08, 0x6a, 0x6f, 0x69, 0x6e, 0x54, 0x79, + 0x70, 0x65, 0x22, 0x15, 0x0a, 0x13, 0x47, 0x65, 0x74, 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, + 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x42, 0x0a, 0x11, 0x47, 0x65, 0x74, + 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x2d, + 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x0b, 0x32, 0x0f, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, + 0x75, 0x70, 0x52, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x22, 0x46, 0x0a, + 0x15, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2d, 0x0a, 0x09, 0x74, 0x61, 0x73, 0x6b, 0x47, 0x72, + 0x6f, 0x75, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x73, 0x70, 0x71, 0x72, + 0x2e, 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x52, 0x09, 0x74, 0x61, 0x73, 0x6b, + 0x47, 0x72, 0x6f, 0x75, 0x70, 0x22, 0x15, 0x0a, 0x13, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x61, + 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x18, 0x0a, 0x16, + 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x16, 0x0a, 0x14, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, + 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x2a, 0x2f, + 0x0a, 0x0a, 0x54, 0x61, 0x73, 0x6b, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, + 0x50, 0x6c, 0x61, 0x6e, 0x6e, 0x65, 0x64, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x53, 0x70, 0x6c, + 0x69, 0x74, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x6f, 0x76, 0x65, 0x64, 0x10, 0x02, 0x2a, + 0x35, 0x0a, 0x08, 0x4a, 0x6f, 0x69, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x4a, + 0x6f, 0x69, 0x6e, 0x4e, 0x6f, 0x6e, 0x65, 0x10, 0x00, 0x12, 0x0c, 0x0a, 0x08, 0x4a, 0x6f, 0x69, + 0x6e, 0x4c, 0x65, 0x66, 0x74, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x4a, 0x6f, 0x69, 0x6e, 0x52, + 0x69, 0x67, 0x68, 0x74, 0x10, 0x02, 0x32, 0xef, 0x01, 0x0a, 0x0c, 0x54, 0x61, 0x73, 0x6b, 0x73, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x44, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x54, 0x61, + 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x19, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x47, + 0x65, 0x74, 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x61, 0x73, + 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x4a, 0x0a, + 0x0e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, + 0x1b, 0x2e, 0x73, 0x70, 0x71, 0x72, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x61, 0x73, 0x6b, + 0x47, 0x72, 0x6f, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x19, 0x2e, 0x73, + 0x70, 0x71, 0x72, 0x2e, 0x57, 0x72, 0x69, 0x74, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, + 0x75, 0x70, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x12, 0x4d, 0x0a, 0x0f, 0x52, 0x65, 0x6d, + 0x6f, 0x76, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x12, 0x1c, 0x2e, 0x73, + 0x70, 0x71, 0x72, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, + 0x6f, 0x75, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x73, 0x70, 0x71, + 0x72, 0x2e, 0x52, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x47, 0x72, 0x6f, 0x75, + 0x70, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, 0x0c, 0x5a, 0x0a, 0x73, 0x70, 0x71, 0x72, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_protos_tasks_proto_rawDescOnce sync.Once + file_protos_tasks_proto_rawDescData = file_protos_tasks_proto_rawDesc +) + +func file_protos_tasks_proto_rawDescGZIP() []byte { + file_protos_tasks_proto_rawDescOnce.Do(func() { + file_protos_tasks_proto_rawDescData = protoimpl.X.CompressGZIP(file_protos_tasks_proto_rawDescData) + }) + return file_protos_tasks_proto_rawDescData +} + +var file_protos_tasks_proto_enumTypes = make([]protoimpl.EnumInfo, 2) +var file_protos_tasks_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_protos_tasks_proto_goTypes = []interface{}{ + (TaskStatus)(0), // 0: spqr.TaskStatus + (JoinType)(0), // 1: spqr.JoinType + (*Task)(nil), // 2: spqr.Task + (*TaskGroup)(nil), // 3: spqr.TaskGroup + (*GetTaskGroupRequest)(nil), // 4: spqr.GetTaskGroupRequest + (*GetTaskGroupReply)(nil), // 5: spqr.GetTaskGroupReply + (*WriteTaskGroupRequest)(nil), // 6: spqr.WriteTaskGroupRequest + (*WriteTaskGroupReply)(nil), // 7: spqr.WriteTaskGroupReply + (*RemoveTaskGroupRequest)(nil), // 8: spqr.RemoveTaskGroupRequest + (*RemoveTaskGroupReply)(nil), // 9: spqr.RemoveTaskGroupReply +} +var file_protos_tasks_proto_depIdxs = []int32{ + 0, // 0: spqr.Task.status:type_name -> spqr.TaskStatus + 2, // 1: spqr.TaskGroup.tasks:type_name -> spqr.Task + 1, // 2: spqr.TaskGroup.joinType:type_name -> spqr.JoinType + 3, // 3: spqr.GetTaskGroupReply.taskGroup:type_name -> spqr.TaskGroup + 3, // 4: spqr.WriteTaskGroupRequest.taskGroup:type_name -> spqr.TaskGroup + 4, // 5: spqr.TasksService.GetTaskGroup:input_type -> spqr.GetTaskGroupRequest + 6, // 6: spqr.TasksService.WriteTaskGroup:input_type -> spqr.WriteTaskGroupRequest + 8, // 7: spqr.TasksService.RemoveTaskGroup:input_type -> spqr.RemoveTaskGroupRequest + 5, // 8: spqr.TasksService.GetTaskGroup:output_type -> spqr.GetTaskGroupReply + 7, // 9: spqr.TasksService.WriteTaskGroup:output_type -> spqr.WriteTaskGroupReply + 9, // 10: spqr.TasksService.RemoveTaskGroup:output_type -> spqr.RemoveTaskGroupReply + 8, // [8:11] is the sub-list for method output_type + 5, // [5:8] is the sub-list for method input_type + 5, // [5:5] is the sub-list for extension type_name + 5, // [5:5] is the sub-list for extension extendee + 0, // [0:5] is the sub-list for field type_name +} + +func init() { file_protos_tasks_proto_init() } +func file_protos_tasks_proto_init() { + if File_protos_tasks_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_protos_tasks_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Task); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protos_tasks_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TaskGroup); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protos_tasks_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetTaskGroupRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protos_tasks_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetTaskGroupReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protos_tasks_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WriteTaskGroupRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protos_tasks_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*WriteTaskGroupReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protos_tasks_proto_msgTypes[6].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RemoveTaskGroupRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_protos_tasks_proto_msgTypes[7].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RemoveTaskGroupReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_protos_tasks_proto_rawDesc, + NumEnums: 2, + NumMessages: 8, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_protos_tasks_proto_goTypes, + DependencyIndexes: file_protos_tasks_proto_depIdxs, + EnumInfos: file_protos_tasks_proto_enumTypes, + MessageInfos: file_protos_tasks_proto_msgTypes, + }.Build() + File_protos_tasks_proto = out.File + file_protos_tasks_proto_rawDesc = nil + file_protos_tasks_proto_goTypes = nil + file_protos_tasks_proto_depIdxs = nil +} diff --git a/pkg/protos/tasks_grpc.pb.go b/pkg/protos/tasks_grpc.pb.go new file mode 100644 index 000000000..e3f442ba1 --- /dev/null +++ b/pkg/protos/tasks_grpc.pb.go @@ -0,0 +1,183 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v3.21.12 +// source: protos/tasks.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + TasksService_GetTaskGroup_FullMethodName = "/spqr.TasksService/GetTaskGroup" + TasksService_WriteTaskGroup_FullMethodName = "/spqr.TasksService/WriteTaskGroup" + TasksService_RemoveTaskGroup_FullMethodName = "/spqr.TasksService/RemoveTaskGroup" +) + +// TasksServiceClient is the client API for TasksService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TasksServiceClient interface { + GetTaskGroup(ctx context.Context, in *GetTaskGroupRequest, opts ...grpc.CallOption) (*GetTaskGroupReply, error) + WriteTaskGroup(ctx context.Context, in *WriteTaskGroupRequest, opts ...grpc.CallOption) (*WriteTaskGroupReply, error) + RemoveTaskGroup(ctx context.Context, in *RemoveTaskGroupRequest, opts ...grpc.CallOption) (*RemoveTaskGroupReply, error) +} + +type tasksServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewTasksServiceClient(cc grpc.ClientConnInterface) TasksServiceClient { + return &tasksServiceClient{cc} +} + +func (c *tasksServiceClient) GetTaskGroup(ctx context.Context, in *GetTaskGroupRequest, opts ...grpc.CallOption) (*GetTaskGroupReply, error) { + out := new(GetTaskGroupReply) + err := c.cc.Invoke(ctx, TasksService_GetTaskGroup_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *tasksServiceClient) WriteTaskGroup(ctx context.Context, in *WriteTaskGroupRequest, opts ...grpc.CallOption) (*WriteTaskGroupReply, error) { + out := new(WriteTaskGroupReply) + err := c.cc.Invoke(ctx, TasksService_WriteTaskGroup_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *tasksServiceClient) RemoveTaskGroup(ctx context.Context, in *RemoveTaskGroupRequest, opts ...grpc.CallOption) (*RemoveTaskGroupReply, error) { + out := new(RemoveTaskGroupReply) + err := c.cc.Invoke(ctx, TasksService_RemoveTaskGroup_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// TasksServiceServer is the server API for TasksService service. +// All implementations must embed UnimplementedTasksServiceServer +// for forward compatibility +type TasksServiceServer interface { + GetTaskGroup(context.Context, *GetTaskGroupRequest) (*GetTaskGroupReply, error) + WriteTaskGroup(context.Context, *WriteTaskGroupRequest) (*WriteTaskGroupReply, error) + RemoveTaskGroup(context.Context, *RemoveTaskGroupRequest) (*RemoveTaskGroupReply, error) + mustEmbedUnimplementedTasksServiceServer() +} + +// UnimplementedTasksServiceServer must be embedded to have forward compatible implementations. +type UnimplementedTasksServiceServer struct { +} + +func (UnimplementedTasksServiceServer) GetTaskGroup(context.Context, *GetTaskGroupRequest) (*GetTaskGroupReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetTaskGroup not implemented") +} +func (UnimplementedTasksServiceServer) WriteTaskGroup(context.Context, *WriteTaskGroupRequest) (*WriteTaskGroupReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method WriteTaskGroup not implemented") +} +func (UnimplementedTasksServiceServer) RemoveTaskGroup(context.Context, *RemoveTaskGroupRequest) (*RemoveTaskGroupReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method RemoveTaskGroup not implemented") +} +func (UnimplementedTasksServiceServer) mustEmbedUnimplementedTasksServiceServer() {} + +// UnsafeTasksServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TasksServiceServer will +// result in compilation errors. +type UnsafeTasksServiceServer interface { + mustEmbedUnimplementedTasksServiceServer() +} + +func RegisterTasksServiceServer(s grpc.ServiceRegistrar, srv TasksServiceServer) { + s.RegisterService(&TasksService_ServiceDesc, srv) +} + +func _TasksService_GetTaskGroup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetTaskGroupRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TasksServiceServer).GetTaskGroup(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: TasksService_GetTaskGroup_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TasksServiceServer).GetTaskGroup(ctx, req.(*GetTaskGroupRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _TasksService_WriteTaskGroup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(WriteTaskGroupRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TasksServiceServer).WriteTaskGroup(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: TasksService_WriteTaskGroup_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TasksServiceServer).WriteTaskGroup(ctx, req.(*WriteTaskGroupRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _TasksService_RemoveTaskGroup_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RemoveTaskGroupRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TasksServiceServer).RemoveTaskGroup(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: TasksService_RemoveTaskGroup_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TasksServiceServer).RemoveTaskGroup(ctx, req.(*RemoveTaskGroupRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// TasksService_ServiceDesc is the grpc.ServiceDesc for TasksService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var TasksService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "spqr.TasksService", + HandlerType: (*TasksServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetTaskGroup", + Handler: _TasksService_GetTaskGroup_Handler, + }, + { + MethodName: "WriteTaskGroup", + Handler: _TasksService_WriteTaskGroup_Handler, + }, + { + MethodName: "RemoveTaskGroup", + Handler: _TasksService_RemoveTaskGroup_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "protos/tasks.proto", +} diff --git a/protos/shard.proto b/protos/shard.proto index c22f566b7..8cda6b9ef 100644 --- a/protos/shard.proto +++ b/protos/shard.proto @@ -14,16 +14,15 @@ message ShardInfo { repeated string hosts = 2; } -//TODO is ListShards returns all types of shards or only data-shards? service ShardService { rpc ListShards (ListShardsRequest) returns (ListShardsReply) {} rpc AddDataShard (AddShardRequest) returns (AddShardReply) {} rpc AddWorldShard (AddWorldShardRequest) returns (AddShardReply) {} - rpc GetShardInfo (ShardRequest) returns (ShardInfoReply) {} + rpc GetShard (ShardRequest) returns (ShardReply) {} } -message ShardInfoReply { - ShardInfo shard_info = 1; +message ShardReply { + Shard shard = 1; } message ListShardsRequest { diff --git a/protos/tasks.proto b/protos/tasks.proto new file mode 100644 index 000000000..9dde7be4a --- /dev/null +++ b/protos/tasks.proto @@ -0,0 +1,51 @@ +syntax = "proto3"; + +package spqr; + +option go_package = "spqr/proto"; + +enum TaskStatus { + Planned = 0; + Split = 1; + Moved = 2; +} + +message Task { + string shardIdFrom = 1; + string shardIdTo = 2; + string keyRangeIdFrom = 3; + string keyRangeIdTo = 4; + string keyRangeIdTemp = 5; + bytes bound = 6; + TaskStatus status = 7; +} + +enum JoinType { + JoinNone = 0; + JoinLeft = 1; + JoinRight = 2; +} + +message TaskGroup { + repeated Task tasks = 1; + JoinType joinType = 2; +} + +message GetTaskGroupRequest{} +message GetTaskGroupReply { + TaskGroup taskGroup = 1; +} + +message WriteTaskGroupRequest{ + TaskGroup taskGroup = 1; +} +message WriteTaskGroupReply {} + +message RemoveTaskGroupRequest{} +message RemoveTaskGroupReply{} + +service TasksService { + rpc GetTaskGroup(GetTaskGroupRequest) returns (GetTaskGroupReply) {} + rpc WriteTaskGroup(WriteTaskGroupRequest) returns(WriteTaskGroupReply) {} + rpc RemoveTaskGroup(RemoveTaskGroupRequest) returns(RemoveTaskGroupReply) {} +} diff --git a/qdb/etcdqdb.go b/qdb/etcdqdb.go index 18eca09bb..082b409ac 100644 --- a/qdb/etcdqdb.go +++ b/qdb/etcdqdb.go @@ -55,6 +55,7 @@ const ( routersNamespace = "/routers/" shardsNamespace = "/shards/" relationMappingNamespace = "/relation_mappings/" + taskGroupPath = "/move_task_group" CoordKeepAliveTtl = 3 keyspace = "key_space" @@ -1055,6 +1056,54 @@ func (q *EtcdQDB) GetRelationDistribution(ctx context.Context, relName string) ( } } +// ============================================================================== +// TASKS +// ============================================================================== + +func (q *EtcdQDB) GetTaskGroup(ctx context.Context) (*TaskGroup, error) { + spqrlog.Zero.Debug(). + Msg("etcdqdb: get task group") + + resp, err := q.cli.Get(ctx, taskGroupPath) + if err != nil { + return nil, err + } + + if len(resp.Kvs) == 0 { + return &TaskGroup{ + Tasks: []*Task{}, + }, nil + } + + var taskGroup *TaskGroup + if err := json.Unmarshal(resp.Kvs[0].Value, &taskGroup); err != nil { + return nil, err + } + + return taskGroup, nil +} + +func (q *EtcdQDB) WriteTaskGroup(ctx context.Context, group *TaskGroup) error { + spqrlog.Zero.Debug(). + Msg("etcdqdb: write task group") + + groupJson, err := json.Marshal(group) + if err != nil { + return err + } + + _, err = q.cli.Put(ctx, taskGroupPath, string(groupJson)) + return err +} + +func (q *EtcdQDB) RemoveTaskGroup(ctx context.Context) error { + spqrlog.Zero.Debug(). + Msg("etcdqdb: remove task group") + + _, err := q.cli.Delete(ctx, taskGroupPath) + return err +} + // ============================================================================== // KEY RANGE MOVES // ============================================================================== diff --git a/qdb/memqdb.go b/qdb/memqdb.go index f6f81f29c..ed1a89a03 100644 --- a/qdb/memqdb.go +++ b/qdb/memqdb.go @@ -29,6 +29,7 @@ type MemQDB struct { Routers map[string]*Router `json:"routers"` Transactions map[string]*DataTransferTransaction `json:"transactions"` Coordinator string `json:"coordinator"` + TaskGroup *TaskGroup `json:"taskGroup"` backupPath string /* caches */ @@ -713,3 +714,38 @@ func (q *MemQDB) GetRelationDistribution(_ context.Context, relation string) (*D return q.Distributions[ds], nil } } + +// ============================================================================== +// TASKS +// ============================================================================== + +func (q *MemQDB) GetTaskGroup(_ context.Context) (*TaskGroup, error) { + spqrlog.Zero.Debug().Msg("memqdb: get task group") + q.mu.RLock() + defer q.mu.RUnlock() + + if q.TaskGroup == nil { + return &TaskGroup{ + Tasks: []*Task{}, + }, nil + } + return q.TaskGroup, nil +} + +func (q *MemQDB) WriteTaskGroup(_ context.Context, group *TaskGroup) error { + spqrlog.Zero.Debug().Msg("memqdb: write task group") + q.mu.Lock() + defer q.mu.Unlock() + + q.TaskGroup = group + return nil +} + +func (q *MemQDB) RemoveTaskGroup(ctx context.Context) error { + spqrlog.Zero.Debug().Msg("memqdb: remove task group") + q.mu.Lock() + defer q.mu.Unlock() + + q.TaskGroup = nil + return nil +} diff --git a/qdb/models.go b/qdb/models.go index dbfacc4f8..3f15c974f 100644 --- a/qdb/models.go +++ b/qdb/models.go @@ -100,3 +100,18 @@ func NewDistribution(id string, coltypes []string) *Distribution { return distr } + +type Task struct { + ShardFromId string + ShardToId string + KrIdFrom string + KrIdTo string + Bound []byte + KrIdTemp string + State int +} + +type TaskGroup struct { + Tasks []*Task + JoinType int +} diff --git a/qdb/qdb.go b/qdb/qdb.go index ad091c6ac..9087ff854 100644 --- a/qdb/qdb.go +++ b/qdb/qdb.go @@ -3,7 +3,6 @@ package qdb import ( "context" "fmt" - "github.com/pg-sharding/spqr/pkg/config" ) @@ -73,6 +72,10 @@ type QDB interface { // TODO: fix this by passing FQRN (fully qualified relation name (+schema)) GetRelationDistribution(ctx context.Context, relation string) (*Distribution, error) + GetTaskGroup(ctx context.Context) (*TaskGroup, error) + WriteTaskGroup(ctx context.Context, group *TaskGroup) error + RemoveTaskGroup(ctx context.Context) error + UpdateCoordinator(ctx context.Context, address string) error GetCoordinator(ctx context.Context) (string, error) } diff --git a/router/grpc/qrouter.go b/router/grpc/qrouter.go index b48dff973..22c9d36a3 100644 --- a/router/grpc/qrouter.go +++ b/router/grpc/qrouter.go @@ -3,8 +3,10 @@ package grpc import ( "context" "fmt" + "github.com/pg-sharding/spqr/pkg/models/datashards" "github.com/pg-sharding/spqr/pkg/models/distributions" "github.com/pg-sharding/spqr/pkg/models/spqrerror" + "github.com/pg-sharding/spqr/pkg/models/tasks" "github.com/pg-sharding/spqr/pkg/client" "github.com/pg-sharding/spqr/pkg/meta" @@ -26,11 +28,51 @@ type LocalQrouterServer struct { protos.UnimplementedBackendConnectionsServiceServer protos.UnimplementedPoolServiceServer protos.UnimplementedDistributionServiceServer + protos.UnimplementedTasksServiceServer + protos.UnimplementedShardServiceServer qr qrouter.QueryRouter mgr meta.EntityMgr rr rulerouter.RuleRouter } +func (l *LocalQrouterServer) ListShards(ctx context.Context, _ *protos.ListShardsRequest) (*protos.ListShardsReply, error) { + shards, err := l.mgr.ListShards(ctx) + if err != nil { + return nil, err + } + return &protos.ListShardsReply{ + Shards: func() []*protos.Shard { + res := make([]*protos.Shard, len(shards)) + for i, sh := range shards { + res[i] = datashards.DataShardToProto(sh) + } + return res + }(), + }, nil +} + +func (l *LocalQrouterServer) AddDataShard(ctx context.Context, request *protos.AddShardRequest) (*protos.AddShardReply, error) { + if err := l.mgr.AddDataShard(ctx, datashards.DataShardFromProto(request.GetShard())); err != nil { + return nil, err + } + return &protos.AddShardReply{}, nil +} + +func (l *LocalQrouterServer) AddWorldShard(ctx context.Context, request *protos.AddWorldShardRequest) (*protos.AddShardReply, error) { + //TODO implement me + panic("implement me") +} + +func (l *LocalQrouterServer) GetShard(ctx context.Context, request *protos.ShardRequest) (*protos.ShardReply, error) { + sh, err := l.mgr.GetShard(ctx, request.Id) + if err != nil { + return nil, err + } + return &protos.ShardReply{ + Shard: datashards.DataShardToProto(sh), + }, nil +} + // CreateDistribution creates distribution in QDB // TODO: unit tests func (l *LocalQrouterServer) CreateDistribution(ctx context.Context, request *protos.CreateDistributionRequest) (*protos.CreateDistributionReply, error) { @@ -371,6 +413,24 @@ func (l *LocalQrouterServer) GetCoordinator(ctx context.Context, req *protos.Get return reply, err } +func (l *LocalQrouterServer) GetTaskGroup(ctx context.Context, _ *protos.GetTaskGroupRequest) (*protos.GetTaskGroupReply, error) { + group, err := l.mgr.GetTaskGroup(ctx) + if err != nil { + return nil, err + } + return &protos.GetTaskGroupReply{ + TaskGroup: tasks.TaskGroupToProto(group), + }, nil +} + +func (l *LocalQrouterServer) WriteTaskGroup(ctx context.Context, request *protos.WriteTaskGroupRequest) (*protos.WriteTaskGroupReply, error) { + return &protos.WriteTaskGroupReply{}, l.mgr.WriteTaskGroup(ctx, tasks.TaskGroupFromProto(request.TaskGroup)) +} + +func (l *LocalQrouterServer) RemoveTaskGroup(ctx context.Context, _ *protos.RemoveTaskGroupRequest) (*protos.RemoveTaskGroupReply, error) { + return &protos.RemoveTaskGroupReply{}, l.mgr.RemoveTaskGroup(ctx) +} + func Register(server reflection.GRPCServer, qrouter qrouter.QueryRouter, mgr meta.EntityMgr, rr rulerouter.RuleRouter) { lqr := &LocalQrouterServer{ @@ -389,6 +449,7 @@ func Register(server reflection.GRPCServer, qrouter qrouter.QueryRouter, mgr met protos.RegisterBackendConnectionsServiceServer(server, lqr) protos.RegisterPoolServiceServer(server, lqr) protos.RegisterDistributionServiceServer(server, lqr) + protos.RegisterTasksServiceServer(server, lqr) } var _ protos.KeyRangeServiceServer = &LocalQrouterServer{} @@ -398,3 +459,5 @@ var _ protos.ClientInfoServiceServer = &LocalQrouterServer{} var _ protos.BackendConnectionsServiceServer = &LocalQrouterServer{} var _ protos.PoolServiceServer = &LocalQrouterServer{} var _ protos.DistributionServiceServer = &LocalQrouterServer{} +var _ protos.TasksServiceServer = &LocalQrouterServer{} +var _ protos.ShardServiceServer = &LocalQrouterServer{} diff --git a/test/feature/conf/balancer.yaml b/test/feature/conf/balancer.yaml new file mode 100644 index 000000000..749132f23 --- /dev/null +++ b/test/feature/conf/balancer.yaml @@ -0,0 +1,11 @@ +log_level: debug + +coordinator_address: "regress_coordinator:7003" + +cpu_threshold: 1000 +space_threshold: 10000000 + +stat_interval_sec: 100000 + +max_move_count: 1 +keys_per_move: 10 diff --git a/test/feature/conf/balancer_cpu.yaml b/test/feature/conf/balancer_cpu.yaml new file mode 100644 index 000000000..44b0906db --- /dev/null +++ b/test/feature/conf/balancer_cpu.yaml @@ -0,0 +1,11 @@ +log_level: debug + +coordinator_address: "regress_coordinator:7003" + +cpu_threshold: 0.01 +space_threshold: 100000000 + +stat_interval_sec: 100000 + +max_move_count: 1 +keys_per_move: 10 diff --git a/test/feature/conf/balancer_many_keys.yaml b/test/feature/conf/balancer_many_keys.yaml new file mode 100644 index 000000000..9e924c442 --- /dev/null +++ b/test/feature/conf/balancer_many_keys.yaml @@ -0,0 +1,13 @@ +log_level: debug + +coordinator_address: "regress_coordinator:7003" + +cpu_threshold: 1000 +space_threshold: 10000000 + +stat_interval_sec: 100000 + +max_move_count: 5 +keys_per_move: 10000 + +timeout: 180 diff --git a/test/feature/conf/balancer_several_moves.yaml b/test/feature/conf/balancer_several_moves.yaml new file mode 100644 index 000000000..6763fbbc0 --- /dev/null +++ b/test/feature/conf/balancer_several_moves.yaml @@ -0,0 +1,13 @@ +log_level: debug + +coordinator_address: "regress_coordinator:7003" + +cpu_threshold: 1000 +space_threshold: 10000000 + +stat_interval_sec: 100000 + +max_move_count: 3 +keys_per_move: 10 + +timeout: 180 diff --git a/test/feature/features/balancer.feature b/test/feature/features/balancer.feature new file mode 100644 index 000000000..6b9800102 --- /dev/null +++ b/test/feature/features/balancer.feature @@ -0,0 +1,359 @@ +Feature: Balancer test + Background: + # + # Make host "coordinator" take control + # + Given cluster is up and running + And host "coordinator2" is stopped + And host "coordinator2" is started + + When I execute SQL on host "coordinator" + """ + REGISTER ROUTER r1 ADDRESS regress_router:7000; + CREATE DISTRIBUTION ds1 COLUMN TYPES integer; + ALTER DISTRIBUTION ds1 ATTACH RELATION xMove DISTRIBUTION KEY w_id; + ADD SHARD sh1 WITH HOSTS 'postgresql://regress@spqr_shard_1:6432/regress'; + ADD SHARD sh2 WITH HOSTS 'postgresql://regress@spqr_shard_2:6432/regress'; + """ + Then command return code should be "0" + + Scenario: balancer works + When I execute SQL on host "coordinator" + """ + CREATE KEY RANGE kr1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1; + CREATE KEY RANGE kr2 FROM 100000 ROUTE TO sh2 FOR DISTRIBUTION ds1; + """ + Then command return code should be "0" + + When I run SQL on host "router" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + INSERT INTO xMove (w_id, s) SELECT generate_series(0, 99999), 'sample text value'; + """ + Then command return code should be "0" + When I run command on host "coordinator" with timeout "60" seconds + """ + /spqr/spqr-balancer --config /spqr/test/feature/conf/balancer.yaml > /balancer.log + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + SELECT count(*) FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 10 + """ + When I run SQL on host "shard1" + """ + SELECT count(*) FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 99990 + """ + When I run SQL on host "coordinator" + """ + SHOW key_ranges; + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [{ + "Key range ID":"kr1", + "Distribution ID":"ds1", + "Lower bound":"0", + "Shard ID":"sh1" + }, + { + "Key range ID":"kr2", + "Distribution ID":"ds1", + "Lower bound":"99990", + "Shard ID":"sh2" + }] + """ + + Scenario: balancer works with several possible moves + When I execute SQL on host "coordinator" + """ + CREATE KEY RANGE kr1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1; + CREATE KEY RANGE kr2 FROM 100000 ROUTE TO sh2 FOR DISTRIBUTION ds1; + """ + Then command return code should be "0" + + When I run SQL on host "router" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + INSERT INTO xMove (w_id, s) SELECT generate_series(0, 99999), 'sample text value'; + """ + Then command return code should be "0" + When I run command on host "coordinator" with timeout "180" seconds + """ + /spqr/spqr-balancer --config /spqr/test/feature/conf/balancer_several_moves.yaml > /balancer.log + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + SELECT count(*) FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 30 + """ + When I run SQL on host "shard1" + """ + SELECT count(*) FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 99970 + """ + When I run SQL on host "coordinator" + """ + SHOW key_ranges; + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [{ + "Key range ID":"kr1", + "Distribution ID":"ds1", + "Lower bound":"0", + "Shard ID":"sh1" + }, + { + "Key range ID":"kr2", + "Distribution ID":"ds1", + "Lower bound":"99970", + "Shard ID":"sh2" + }] + """ + + Scenario: balancer does not move more than necessary + When I execute SQL on host "coordinator" + """ + CREATE KEY RANGE kr1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1; + CREATE KEY RANGE kr2 FROM 100000 ROUTE TO sh2 FOR DISTRIBUTION ds1; + """ + Then command return code should be "0" + + When I run SQL on host "router" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + INSERT INTO xMove (w_id, s) SELECT generate_series(0, 99999), 'sample text value'; + """ + Then command return code should be "0" + When I run command on host "coordinator" with timeout "180" seconds + """ + /spqr/spqr-balancer --config /spqr/test/feature/conf/balancer_many_keys.yaml > /balancer.log + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + SELECT count(*) FROM xMove + """ + Then command return code should be "0" + And SQL result should not match regexp + """ + 50000 + """ + + Scenario: balancer works when transferring to previous key range + When I execute SQL on host "coordinator" + """ + CREATE KEY RANGE kr1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1; + CREATE KEY RANGE kr2 FROM 100000 ROUTE TO sh2 FOR DISTRIBUTION ds1; + """ + Then command return code should be "0" + + When I run SQL on host "router" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + INSERT INTO xMove (w_id, s) SELECT generate_series(100000, 199999), 'sample text value'; + """ + Then command return code should be "0" + When I run command on host "coordinator" with timeout "60" seconds + """ + /spqr/spqr-balancer --config /spqr/test/feature/conf/balancer_several_moves.yaml > /balancer.log + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + SELECT count(*) FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 99970 + """ + When I run SQL on host "shard1" + """ + SELECT count(*) FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 30 + """ + When I run SQL on host "coordinator" + """ + SHOW key_ranges; + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [{ + "Key range ID":"kr1", + "Distribution ID":"ds1", + "Lower bound":"0", + "Shard ID":"sh1" + }, + { + "Key range ID":"kr2", + "Distribution ID":"ds1", + "Lower bound":"100030", + "Shard ID":"sh2" + }] + """ + + Scenario: balancer works when transferring to shard without key range + When I execute SQL on host "coordinator" + """ + CREATE KEY RANGE kr1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1; + """ + Then command return code should be "0" + + When I run SQL on host "router" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + INSERT INTO xMove (w_id, s) SELECT generate_series(0, 99999), 'sample text value'; + """ + Then command return code should be "0" + When I run command on host "coordinator" with timeout "60" seconds + """ + /spqr/spqr-balancer --config /spqr/test/feature/conf/balancer_several_moves.yaml > /balancer.log + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + SELECT count(*) FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 30 + """ + When I run SQL on host "shard1" + """ + SELECT count(*) FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 99970 + """ + When I run SQL on host "coordinator" + """ + SHOW key_ranges; + """ + Then command return code should be "0" + And SQL result should match json + """ + [{ + "Key range ID":"kr1", + "Distribution ID":"ds1", + "Lower bound":"0", + "Shard ID":"sh1" + }, + { + "Distribution ID":"ds1", + "Lower bound":"99970", + "Shard ID":"sh2" + }] + """ + + Scenario: balancer works with cpu metric + When I execute SQL on host "coordinator" + """ + CREATE KEY RANGE kr1 FROM 0 ROUTE TO sh1 FOR DISTRIBUTION ds1; + CREATE KEY RANGE kr2 FROM 100000 ROUTE TO sh2 FOR DISTRIBUTION ds1; + """ + Then command return code should be "0" + + When I run SQL on host "router" + """ + CREATE TABLE xMove(w_id INT, s TEXT); + """ + Then command return code should be "0" + When I run SQL on host "shard1" + """ + /* key_range_id:: kr1 */ INSERT INTO xMove (w_id, s) SELECT generate_series(0, 99999), 'sample text value'; + """ + Then command return code should be "0" + When I run command on host "coordinator" with timeout "60" seconds + """ + /spqr/spqr-balancer --config /spqr/test/feature/conf/balancer_cpu.yaml > /balancer.log + """ + Then command return code should be "0" + When I run SQL on host "shard2" + """ + SELECT count(*) FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 10 + """ + When I run SQL on host "shard1" + """ + SELECT count(*) FROM xMove + """ + Then command return code should be "0" + And SQL result should match regexp + """ + 99990 + """ + When I run SQL on host "coordinator" + """ + SHOW key_ranges; + """ + Then command return code should be "0" + And SQL result should match json_exactly + """ + [{ + "Key range ID":"kr1", + "Distribution ID":"ds1", + "Lower bound":"0", + "Shard ID":"sh1" + }, + { + "Key range ID":"kr2", + "Distribution ID":"ds1", + "Lower bound":"99990", + "Shard ID":"sh2" + }] + """ diff --git a/test/feature/spqr_test.go b/test/feature/spqr_test.go index a55797091..3970ed659 100644 --- a/test/feature/spqr_test.go +++ b/test/feature/spqr_test.go @@ -583,6 +583,13 @@ func (tctx *testContext) stepIRunCommandOnHost(host string, body *godog.DocStrin return err } +func (tctx *testContext) stepIRunCommandOnHostWithTimeout(host string, timeout int, body *godog.DocString) error { + cmd := strings.TrimSpace(body.Content) + var err error + tctx.commandRetcode, tctx.commandOutput, err = tctx.composer.RunCommand(host, cmd, time.Duration(timeout)*time.Second) + return err +} + func (tctx *testContext) stepCommandReturnCodeShouldBe(code int) error { if tctx.commandRetcode != code { return fmt.Errorf("command return code is %d, while expected %d\n%s", tctx.commandRetcode, code, tctx.commandOutput) @@ -818,6 +825,7 @@ func InitializeScenario(s *godog.ScenarioContext, t *testing.T) { // command and SQL execution s.Step(`^I run command on host "([^"]*)"$`, tctx.stepIRunCommandOnHost) + s.Step(`^I run command on host "([^"]*)" with timeout "(\d+)" seconds$`, tctx.stepIRunCommandOnHostWithTimeout) s.Step(`^command return code should be "(\d+)"$`, tctx.stepCommandReturnCodeShouldBe) s.Step(`^command output should match (\w+)$`, tctx.stepCommandOutputShouldMatch) s.Step(`^I run SQL on host "([^"]*)"$`, tctx.stepIRunSQLOnHost)