Skip to content

Commit

Permalink
sessionctx: change variable tidb_enable_tiflash_pipeline_model to g…
Browse files Browse the repository at this point in the history
…lobal level (pingcap#46709)

close pingcap#45618
  • Loading branch information
SeaRise authored Sep 13, 2023
1 parent 3735e11 commit 8f8c433
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 14 deletions.
2 changes: 1 addition & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func SetTiFlashConfVarsInContext(ctx context.Context, sctx sessionctx.Context) c
if sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalSort != -1 {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalSort, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalSort, 10))
}
if sctx.GetSessionVars().TiFlashEnablePipelineMode {
if variable.TiFlashEnablePipelineMode.Load() {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBEnableTiFlashPipelineMode, "1")
} else {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBEnableTiFlashPipelineMode, "0")
Expand Down
6 changes: 0 additions & 6 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,11 +892,6 @@ type SessionVars struct {
// TiFlashQuerySpillRatio is the percentage threshold to trigger auto spill in TiFlash if TiFlashMaxQueryMemoryPerNode is set
TiFlashQuerySpillRatio float64

// TiFlashEnablePipelineMode means if we should use pipeline model to execute query or not in tiflash.
// Default value is `true`, means never use pipeline model in tiflash.
// Value set to `true` means try to execute query with pipeline model in tiflash.
TiFlashEnablePipelineMode bool

// TiDBAllowAutoRandExplicitInsert indicates whether explicit insertion on auto_random column is allowed.
AllowAutoRandExplicitInsert bool

Expand Down Expand Up @@ -2057,7 +2052,6 @@ func NewSessionVars(hctx HookContext) *SessionVars {
vars.TiFlashMaxBytesBeforeExternalSort = DefTiFlashMaxBytesBeforeExternalSort
vars.TiFlashMaxQueryMemoryPerNode = DefTiFlashMemQuotaQueryPerNode
vars.TiFlashQuerySpillRatio = DefTiFlashQuerySpillRatio
vars.TiFlashEnablePipelineMode = DefTiDBEnableTiFlashPipelineMode
vars.MPPStoreFailTTL = DefTiDBMPPStoreFailTTL
vars.DiskTracker = disk.NewTracker(memory.LabelForSession, -1)
vars.MemTracker = memory.NewTracker(memory.LabelForSession, vars.MemQuotaQuery)
Expand Down
6 changes: 4 additions & 2 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,11 @@ var defaultSysVars = []*SysVar{
s.TiFlashQuerySpillRatio = tidbOptFloat64(val, DefTiFlashQuerySpillRatio)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableTiFlashPipelineMode, Type: TypeBool, Value: BoolToOnOff(DefTiDBEnableTiFlashPipelineMode), SetSession: func(s *SessionVars, val string) error {
s.TiFlashEnablePipelineMode = TiDBOptOn(val)
{Scope: ScopeGlobal, Name: TiDBEnableTiFlashPipelineMode, Type: TypeBool, Value: BoolToOnOff(DefTiDBEnableTiFlashPipelineMode), SetGlobal: func(ctx context.Context, vars *SessionVars, s string) error {
TiFlashEnablePipelineMode.Store(TiDBOptOn(s))
return nil
}, GetGlobal: func(ctx context.Context, vars *SessionVars) (string, error) {
return BoolToOnOff(TiFlashEnablePipelineMode.Load()), nil
}},
{Scope: ScopeSession, Name: TiDBSnapshot, Value: "", skipInit: true, SetSession: func(s *SessionVars, val string) error {
err := setSnapshotTS(s, val)
Expand Down
22 changes: 22 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,6 +1360,28 @@ func TestTiDBTiFlashReplicaRead(t *testing.T) {
require.Equal(t, DefTiFlashReplicaRead, val)
}

func TestSetEnableTiFlashPipeline(t *testing.T) {
vars := NewSessionVars(nil)
mock := NewMockGlobalAccessor4Tests()
mock.SessionVars = vars
vars.GlobalVarsAccessor = mock
enablePipeline := GetSysVar(TiDBEnableTiFlashPipelineMode)
// Check default value
require.Equal(t, "ON", enablePipeline.Value)

err := mock.SetGlobalSysVar(context.Background(), TiDBEnableTiFlashPipelineMode, "OFF")
require.NoError(t, err)
val, err := mock.GetGlobalSysVar(TiDBEnableTiFlashPipelineMode)
require.NoError(t, err)
require.Equal(t, "OFF", val)

err = mock.SetGlobalSysVar(context.Background(), TiDBEnableTiFlashPipelineMode, "ON")
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBEnableTiFlashPipelineMode)
require.NoError(t, err)
require.Equal(t, "ON", val)
}

func TestSetTiDBCloudStorageURI(t *testing.T) {
vars := NewSessionVars(nil)
mock := NewMockGlobalAccessor4Tests()
Expand Down
10 changes: 5 additions & 5 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,6 @@ const (
// TiFlashQuerySpillRatio is the threshold that TiFlash will trigger auto spill when the memory usage is above this percentage
TiFlashQuerySpillRatio = "tiflash_query_spill_ratio"

// TiDBEnableTiFlashPipelineMode means if we should use pipeline model to execute query or not in tiflash.
// Default value is `true`, means never use pipeline model in tiflash.
// Value set to `true` means try to execute query with pipeline model in tiflash.
TiDBEnableTiFlashPipelineMode = "tidb_enable_tiflash_pipeline_model"

// TiDBMPPStoreFailTTL is the unavailable time when a store is detected failed. During that time, tidb will not send any task to
// TiFlash even though the failed TiFlash node has been recovered.
TiDBMPPStoreFailTTL = "tidb_mpp_store_fail_ttl"
Expand Down Expand Up @@ -1101,6 +1096,10 @@ const (
TiDBServiceScope = "tidb_service_scope"
// TiDBSchemaVersionCacheLimit defines the capacity size of domain infoSchema cache.
TiDBSchemaVersionCacheLimit = "tidb_schema_version_cache_limit"
// TiDBEnableTiFlashPipelineMode means if we should use pipeline model to execute query or not in tiflash.
// Default value is `true`, means never use pipeline model in tiflash.
// Value set to `true` means try to execute query with pipeline model in tiflash.
TiDBEnableTiFlashPipelineMode = "tiflash_enable_pipeline_model"
)

// TiDB intentional limits
Expand Down Expand Up @@ -1504,6 +1503,7 @@ var (
EnableResourceControl = atomic.NewBool(false)
EnableCheckConstraint = atomic.NewBool(DefTiDBEnableCheckConstraint)
SkipMissingPartitionStats = atomic.NewBool(DefTiDBSkipMissingPartitionStats)
TiFlashEnablePipelineMode = atomic.NewBool(DefTiDBEnableTiFlashPipelineMode)
ServiceScope = atomic.NewString("")
SchemaVersionCacheLimit = atomic.NewInt64(DefTiDBSchemaVersionCacheLimit)
CloudStorageURI = atomic.NewString("")
Expand Down
7 changes: 7 additions & 0 deletions sessionctx/variable/variable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,13 @@ func TestSkipSysvarCache(t *testing.T) {
require.False(t, GetSysVar(TiDBEnableAsyncCommit).SkipSysvarCache())
}

func TestTiFlashEnablePipeline(t *testing.T) {
require.True(t, GetSysVar(TiDBEnableTiFlashPipelineMode).HasGlobalScope())
require.False(t, GetSysVar(TiDBEnableTiFlashPipelineMode).HasSessionScope())
require.False(t, GetSysVar(TiDBEnableTiFlashPipelineMode).HasInstanceScope())
require.False(t, GetSysVar(TiDBEnableTiFlashPipelineMode).HasNoneScope())
}

func TestTimeValidationWithTimezone(t *testing.T) {
sv := SysVar{Scope: ScopeSession, Name: "mynewsysvar", Value: "23:59 +0000", Type: TypeTime}
vars := NewSessionVars(nil)
Expand Down

0 comments on commit 8f8c433

Please sign in to comment.