Skip to content

Commit

Permalink
ddl: support creating tiflash replica in API V2 (#41415)
Browse files Browse the repository at this point in the history
close #41414
  • Loading branch information
iosmanthus authored Mar 14, 2023
1 parent cc54d1b commit d10b1c4
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 77 deletions.
8 changes: 4 additions & 4 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -3373,8 +3373,8 @@ def go_deps():
name = "com_github_pingcap_kvproto",
build_file_proto_mode = "disable_global",
importpath = "github.com/pingcap/kvproto",
sum = "h1:bgLRG7gPJCq6aduA65ZV7xWQBThTcuarBB9VdfAzV4g=",
version = "v0.0.0-20230228041042-1e9aca94bab6",
sum = "h1:z6WwBPP0Txmal5zf+H7vf/lSmKZtSS8BTNwiLjEjdnA=",
version = "v0.0.0-20230312142449-01623096c924",
)
go_repository(
name = "com_github_pingcap_log",
Expand Down Expand Up @@ -4101,8 +4101,8 @@ def go_deps():
name = "com_github_tikv_client_go_v2",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/client-go/v2",
sum = "h1:TKxKhSpF+G6yAdfbPo2Nl2vl6wP8Tm1gufU1HPnL6u0=",
version = "v2.0.7-0.20230309100832-f555fdd2c9d8",
sum = "h1:GN8W4unt5SUr663+MfwawqP7ezZiQ/CNXJIdeNsoCuM=",
version = "v2.0.7-0.20230313070726-c21bf9396ae3",
)
go_repository(
name = "com_github_tikv_pd",
Expand Down
10 changes: 0 additions & 10 deletions br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/backup"
Expand Down Expand Up @@ -1736,15 +1735,6 @@ func (rc *Client) PreCheckTableTiFlashReplica(
tables []*metautil.Table,
recorder *tiflashrec.TiFlashRecorder,
) error {
// For TiDB 6.6, we do not support recover TiFlash replica while enabling API V2.
// TODO(iosmanthus): remove this after TiFlash support API V2.
if rc.GetDomain().Store().GetCodec().GetAPIVersion() == kvrpcpb.APIVersion_V2 {
log.Warn("TiFlash does not support API V2, reset replica count to 0")
for _, table := range tables {
table.Info.TiFlashReplica = nil
}
return nil
}
tiFlashStoreCount, err := rc.getTiFlashNodeCount(ctx)
if err != nil {
return err
Expand Down
79 changes: 77 additions & 2 deletions ddl/placement/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
package placement

import (
"encoding/hex"
"encoding/json"
"fmt"
"regexp"
"strings"

"github.com/pingcap/tidb/util/codec"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -57,19 +60,91 @@ type Rule struct {
LocationLabels []string `json:"location_labels,omitempty"`
}

var _ json.Marshaler = (*TiFlashRule)(nil)
var _ json.Unmarshaler = (*TiFlashRule)(nil)

// TiFlashRule extends Rule with other necessary fields.
type TiFlashRule struct {
GroupID string
ID string
Index int
Override bool
Role PeerRoleType
Count int
Constraints Constraints
LocationLabels []string
IsolationLevel string
StartKey []byte
EndKey []byte
}

type tiFlashRule struct {
GroupID string `json:"group_id"`
ID string `json:"id"`
Index int `json:"index,omitempty"`
Override bool `json:"override,omitempty"`
StartKeyHex string `json:"start_key"`
EndKeyHex string `json:"end_key"`
Role PeerRoleType `json:"role"`
Count int `json:"count"`
Constraints Constraints `json:"label_constraints,omitempty"`
LocationLabels []string `json:"location_labels,omitempty"`
IsolationLevel string `json:"isolation_level,omitempty"`
StartKeyHex string `json:"start_key"`
EndKeyHex string `json:"end_key"`
}

// MarshalJSON implements json.Marshaler interface for TiFlashRule.
func (r *TiFlashRule) MarshalJSON() ([]byte, error) {
return json.Marshal(&tiFlashRule{
GroupID: r.GroupID,
ID: r.ID,
Index: r.Index,
Override: r.Override,
Role: r.Role,
Count: r.Count,
Constraints: r.Constraints,
LocationLabels: r.LocationLabels,
IsolationLevel: r.IsolationLevel,
StartKeyHex: hex.EncodeToString(codec.EncodeBytes(nil, r.StartKey)),
EndKeyHex: hex.EncodeToString(codec.EncodeBytes(nil, r.EndKey)),
})
}

// UnmarshalJSON implements json.Unmarshaler interface for TiFlashRule.
func (r *TiFlashRule) UnmarshalJSON(bytes []byte) error {
var rule tiFlashRule
if err := json.Unmarshal(bytes, &rule); err != nil {
return err
}
*r = TiFlashRule{
GroupID: rule.GroupID,
ID: rule.ID,
Index: rule.Index,
Override: rule.Override,
Role: rule.Role,
Count: rule.Count,
Constraints: rule.Constraints,
LocationLabels: rule.LocationLabels,
IsolationLevel: rule.IsolationLevel,
}

startKey, err := hex.DecodeString(rule.StartKeyHex)
if err != nil {
return err
}

endKey, err := hex.DecodeString(rule.EndKeyHex)
if err != nil {
return err
}

_, r.StartKey, err = codec.DecodeBytes(startKey, nil)
if err != nil {
return err
}

_, r.EndKey, err = codec.DecodeBytes(endKey, nil)

return err
}

// NewRule constructs *Rule from role, count, and constraints. It is here to
Expand Down
3 changes: 2 additions & 1 deletion ddl/placement_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,8 @@ func checkTiflashReplicaSet(t *testing.T, do *domain.Domain, db, tb string, cnt
return
}

infosync.GetMockTiFlash().CheckPlacementRule(*infosync.MakeNewRule(tbl.Meta().ID, 1, nil))
r := infosync.MakeNewRule(tbl.Meta().ID, 1, nil)
infosync.GetMockTiFlash().CheckPlacementRule(r)
require.NotNil(t, tiflashReplica)
require.Equal(t, cnt, tiflashReplica.Count)
}
Expand Down
18 changes: 9 additions & 9 deletions ddl/tiflashtest/ddl_tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,12 +146,12 @@ func (s *tiflashContext) CheckFlashback(tk *testkit.TestKit, t *testing.T) {
require.NotNil(t, tb)
if tb.Meta().Partition != nil {
for _, e := range tb.Meta().Partition.Definitions {
ruleName := fmt.Sprintf("table-%v-r", e.ID)
ruleName := infosync.MakeRuleID(e.ID)
_, ok := s.tiflash.GetPlacementRule(ruleName)
require.True(t, ok)
}
} else {
ruleName := fmt.Sprintf("table-%v-r", tb.Meta().ID)
ruleName := infosync.MakeRuleID(tb.Meta().ID)
_, ok := s.tiflash.GetPlacementRule(ruleName)
require.True(t, ok)
}
Expand Down Expand Up @@ -368,7 +368,7 @@ func TestTiFlashReplicaAvailable(t *testing.T) {
s.CheckFlashback(tk, t)
tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash"))
require.NoError(t, err)
r, ok := s.tiflash.GetPlacementRule(fmt.Sprintf("table-%v-r", tb.Meta().ID))
r, ok := s.tiflash.GetPlacementRule(infosync.MakeRuleID(tb.Meta().ID))
require.NotNil(t, r)
require.True(t, ok)
tk.MustExec("alter table ddltiflash set tiflash replica 0")
Expand All @@ -377,7 +377,7 @@ func TestTiFlashReplicaAvailable(t *testing.T) {
require.NoError(t, err)
replica := tb.Meta().TiFlashReplica
require.Nil(t, replica)
r, ok = s.tiflash.GetPlacementRule(fmt.Sprintf("table-%v-r", tb.Meta().ID))
r, ok = s.tiflash.GetPlacementRule(infosync.MakeRuleID(tb.Meta().ID))
require.Nil(t, r)
require.False(t, ok)
}
Expand Down Expand Up @@ -556,7 +556,7 @@ func TestSetPlacementRuleNormal(t *testing.T) {
tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("ddltiflash"))
require.NoError(t, err)
expectRule := infosync.MakeNewRule(tb.Meta().ID, 1, []string{"a", "b"})
res := s.tiflash.CheckPlacementRule(*expectRule)
res := s.tiflash.CheckPlacementRule(expectRule)
require.True(t, res)

// Set lastSafePoint to a timepoint in future, so all dropped table can be reckon as gc-ed.
Expand All @@ -568,7 +568,7 @@ func TestSetPlacementRuleNormal(t *testing.T) {
defer fCancelPD()
tk.MustExec("drop table ddltiflash")
expectRule = infosync.MakeNewRule(tb.Meta().ID, 1, []string{"a", "b"})
res = s.tiflash.CheckPlacementRule(*expectRule)
res = s.tiflash.CheckPlacementRule(expectRule)
require.True(t, res)
}

Expand Down Expand Up @@ -612,7 +612,7 @@ func TestSetPlacementRuleWithGCWorker(t *testing.T) {
require.NoError(t, err)

expectRule := infosync.MakeNewRule(tb.Meta().ID, 1, []string{"a", "b"})
res := s.tiflash.CheckPlacementRule(*expectRule)
res := s.tiflash.CheckPlacementRule(expectRule)
require.True(t, res)

ChangeGCSafePoint(tk, time.Now().Add(-time.Hour), "true", "10m0s")
Expand All @@ -622,7 +622,7 @@ func TestSetPlacementRuleWithGCWorker(t *testing.T) {

// Wait GC
time.Sleep(ddl.PollTiFlashInterval * RoundToBeAvailable)
res = s.tiflash.CheckPlacementRule(*expectRule)
res = s.tiflash.CheckPlacementRule(expectRule)
require.False(t, res)
}

Expand All @@ -643,7 +643,7 @@ func TestSetPlacementRuleFail(t *testing.T) {
require.NoError(t, err)

expectRule := infosync.MakeNewRule(tb.Meta().ID, 1, []string{})
res := s.tiflash.CheckPlacementRule(*expectRule)
res := s.tiflash.CheckPlacementRule(expectRule)
require.False(t, res)
}

Expand Down
4 changes: 2 additions & 2 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1222,7 +1222,7 @@ func ConfigureTiFlashPDForTable(id int64, count uint64, locationLabels *[]string
ctx := context.Background()
logutil.BgLogger().Info("ConfigureTiFlashPDForTable", zap.Int64("tableID", id), zap.Uint64("count", count))
ruleNew := MakeNewRule(id, count, *locationLabels)
if e := is.tiflashReplicaManager.SetPlacementRule(ctx, *ruleNew); e != nil {
if e := is.tiflashReplicaManager.SetPlacementRule(ctx, ruleNew); e != nil {
return errors.Trace(e)
}
return nil
Expand All @@ -1238,7 +1238,7 @@ func ConfigureTiFlashPDForPartitions(accel bool, definitions *[]model.PartitionD
for _, p := range *definitions {
logutil.BgLogger().Info("ConfigureTiFlashPDForPartitions", zap.Int64("tableID", tableID), zap.Int64("partID", p.ID), zap.Bool("accel", accel), zap.Uint64("count", count))
ruleNew := MakeNewRule(p.ID, count, *locationLabels)
if e := is.tiflashReplicaManager.SetPlacementRule(ctx, *ruleNew); e != nil {
if e := is.tiflashReplicaManager.SetPlacementRule(ctx, ruleNew); e != nil {
return errors.Trace(e)
}
if accel {
Expand Down
2 changes: 1 addition & 1 deletion domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func TestTiFlashManager(t *testing.T) {

// SetTiFlashPlacementRule/GetTiFlashGroupRules
rule := MakeNewRule(1, 2, []string{"a"})
require.NoError(t, SetTiFlashPlacementRule(ctx, *rule))
require.NoError(t, SetTiFlashPlacementRule(ctx, rule))
rules, err := GetTiFlashGroupRules(ctx, "tiflash")
require.NoError(t, err)
require.Equal(t, 1, len(rules))
Expand Down
Loading

0 comments on commit d10b1c4

Please sign in to comment.