Skip to content

Commit

Permalink
update origin
Browse files Browse the repository at this point in the history
Signed-off-by: bufferflies <1045931706@qq.com>
  • Loading branch information
bufferflies committed Sep 12, 2023
2 parents def9e63 + 58113f8 commit da678ef
Show file tree
Hide file tree
Showing 99 changed files with 1,580 additions and 1,231 deletions.
8 changes: 0 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,14 +272,6 @@ func WithInitMetricsOption(initMetrics bool) ClientOption {
}
}

// WithAllowTSOFallback configures the client with `allowTSOFallback` option.
// NOTICE: This should only be used for testing.
func WithAllowTSOFallback() ClientOption {
return func(c *client) {
c.option.allowTSOFallback = true
}
}

var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching.
Expand Down
1 change: 0 additions & 1 deletion client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type option struct {
enableForwarding bool
metricsLabels prometheus.Labels
initMetrics bool
allowTSOFallback bool

// Dynamic options.
dynamicOptions [dynamicOptionCount]atomic.Value
Expand Down
17 changes: 1 addition & 16 deletions client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,22 +796,7 @@ func (c *tsoClient) compareAndSwapTS(
// all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned
// last time.
if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) {
if !c.option.allowTSOFallback {
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)),
zap.String("last-tso-server", lastTSOInfo.tsoServer),
zap.String("cur-tso-server", curTSOInfo.tsoServer),
zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID),
zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID),
zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID),
zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt),
zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt))
}
log.Error("[tso] timestamp fallback",
log.Panic("[tso] timestamp fallback",
zap.String("dc-location", dcLocation),
zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()),
zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)),
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,11 @@ error = '''
sync max ts failed, %s
'''

["PD:tso:ErrUpdateTimestamp"]
error = '''
update timestamp failed, %s
'''

["PD:typeutil:ErrBytesToUint64"]
error = '''
invalid data, must 8 bytes, but %d
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230705095454-5e220f970f27
Expand Down Expand Up @@ -145,7 +145,7 @@ require (
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/petermattis/goid v0.0.0-20211229010228-4d14c490ee36 // indirect
github.com/pingcap/tipb v0.0.0-20220718022156-3e2483c20a9e // indirect
github.com/pkg/errors v0.9.1
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/prometheus/client_model v0.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30 h1:EvqKcDT7ceGLW0mXqM8Cp5Z8DfgQRnwj2YTnlCLj2QI=
github.com/pingcap/kvproto v0.0.0-20230727073445-53e1f8730c30/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974 h1:Gn8rf2Mb3QDifUQHdtcopqKclc9L11hjhZFYBE65lcw=
github.com/pingcap/kvproto v0.0.0-20230905082026-5336fac26974/go.mod h1:r0q/CFcwvyeRhKtoqzmWMBebrtpIziQQ9vR+JKh1knc=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
4 changes: 4 additions & 0 deletions pd.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
{
"name": "pd-tso-bench",
"path": "tools/pd-tso-bench"
},
{
"name": "pd-api-bench",
"path": "tools/pd-api-bench"
}
],
"settings": {}
Expand Down
22 changes: 5 additions & 17 deletions pkg/audit/audit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"testing"
"time"

"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/requestutil"
"github.com/tikv/pd/pkg/utils/testutil"
)

func TestLabelMatcher(t *testing.T) {
Expand Down Expand Up @@ -93,8 +93,8 @@ func TestLocalLogBackendUsingFile(t *testing.T) {
t.Parallel()
re := require.New(t)
backend := NewLocalLogBackend(true)
fname := initLog()
defer os.Remove(fname)
fname := testutil.InitTempFileLogger("info")
defer os.RemoveAll(fname)
req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody"))
re.False(backend.ProcessHTTPRequest(req))
info := requestutil.GetRequestInfo(req)
Expand Down Expand Up @@ -125,8 +125,8 @@ func BenchmarkLocalLogAuditUsingTerminal(b *testing.B) {
func BenchmarkLocalLogAuditUsingFile(b *testing.B) {
b.StopTimer()
backend := NewLocalLogBackend(true)
fname := initLog()
defer os.Remove(fname)
fname := testutil.InitTempFileLogger("info")
defer os.RemoveAll(fname)
req, _ := http.NewRequest(http.MethodGet, "http://127.0.0.1:2379/test?test=test", strings.NewReader("testBody"))
b.StartTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -135,15 +135,3 @@ func BenchmarkLocalLogAuditUsingFile(b *testing.B) {
backend.ProcessHTTPRequest(req)
}
}

func initLog() string {
cfg := &log.Config{}
f, _ := os.CreateTemp("/tmp", "pd_tests")
fname := f.Name()
f.Close()
cfg.File.Filename = fname
cfg.Level = "info"
lg, p, _ := log.InitLogger(cfg)
log.ReplaceGlobals(lg, p)
return fname
}
2 changes: 1 addition & 1 deletion pkg/basicserver/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ type Server interface {
// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
IsServing() bool
// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
AddServiceReadyCallback(callbacks ...func(context.Context))
AddServiceReadyCallback(callbacks ...func(context.Context) error)
}
26 changes: 12 additions & 14 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,9 @@ const (
func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionCreateOption) *RegionInfo {
// Convert unit to MB.
// If region isn't empty and less than 1MB, use 1MB instead.
// The size of empty region will be correct by the previous RegionInfo.
regionSize := heartbeat.GetApproximateSize() / units.MiB
// Due to https://github.com/tikv/tikv/pull/11170, if region size is not initialized,
// approximate size will be zero, and region size is zero not EmptyRegionApproximateSize
if heartbeat.GetApproximateSize() > 0 && regionSize < EmptyRegionApproximateSize {
regionSize = EmptyRegionApproximateSize
}
Expand Down Expand Up @@ -193,19 +194,9 @@ func RegionFromHeartbeat(heartbeat *pdpb.RegionHeartbeatRequest, opts ...RegionC
return region
}

// Inherit inherits the buckets and region size from the parent region if bucket enabled.
// correct approximate size and buckets by the previous size if here exists a reported RegionInfo.
// See https://github.com/tikv/tikv/issues/11114
func (r *RegionInfo) Inherit(origin *RegionInfo, bucketEnable bool) {
// regionSize should not be zero if region is not empty.
if r.GetApproximateSize() == 0 {
if origin != nil {
r.approximateSize = origin.approximateSize
} else {
r.approximateSize = EmptyRegionApproximateSize
}
}
if bucketEnable && origin != nil && r.buckets == nil {
// InheritBuckets inherits the buckets from the parent region if bucket enabled.
func (r *RegionInfo) InheritBuckets(origin *RegionInfo) {
if origin != nil && r.buckets == nil {
r.buckets = origin.buckets
}
}
Expand Down Expand Up @@ -515,6 +506,13 @@ func (r *RegionInfo) GetApproximateSize() int64 {
return r.approximateSize
}

// IsEmptyRegion returns whether the region is empty.
func (r *RegionInfo) IsEmptyRegion() bool {
// When cluster resumes, the region size may be not initialized, but region heartbeat is send.
// So use `==` here.
return r.approximateSize == EmptyRegionApproximateSize
}

// GetStorePeerApproximateKeys returns the approximate keys of the peer on the specified store.
func (r *RegionInfo) GetStorePeerApproximateKeys(storeID uint64) int64 {
peer := r.GetStorePeer(storeID)
Expand Down
31 changes: 2 additions & 29 deletions pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,35 +186,9 @@ func TestSortedEqual(t *testing.T) {
}
}

func TestInherit(t *testing.T) {
func TestInheritBuckets(t *testing.T) {
re := require.New(t)
// size in MB
// case for approximateSize
testCases := []struct {
originExists bool
originSize uint64
size uint64
expect uint64
}{
{false, 0, 0, 1},
{false, 0, 2, 2},
{true, 0, 2, 2},
{true, 1, 2, 2},
{true, 2, 0, 2},
}
for _, testCase := range testCases {
var origin *RegionInfo
if testCase.originExists {
origin = NewRegionInfo(&metapb.Region{Id: 100}, nil)
origin.approximateSize = int64(testCase.originSize)
}
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.approximateSize = int64(testCase.size)
r.Inherit(origin, false)
re.Equal(int64(testCase.expect), r.approximateSize)
}

// bucket
data := []struct {
originBuckets *metapb.Buckets
buckets *metapb.Buckets
Expand All @@ -227,12 +201,11 @@ func TestInherit(t *testing.T) {
for _, d := range data {
origin := NewRegionInfo(&metapb.Region{Id: 100}, nil, SetBuckets(d.originBuckets))
r := NewRegionInfo(&metapb.Region{Id: 100}, nil)
r.Inherit(origin, true)
r.InheritBuckets(origin)
re.Equal(d.originBuckets, r.GetBuckets())
// region will not inherit bucket keys.
if origin.GetBuckets() != nil {
newRegion := NewRegionInfo(&metapb.Region{Id: 100}, nil)
newRegion.Inherit(origin, false)
re.NotEqual(d.originBuckets, newRegion.GetBuckets())
}
}
Expand Down
43 changes: 18 additions & 25 deletions pkg/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,33 +74,26 @@ func SetStoreDeployPath(deployPath string) StoreCreateOption {
}
}

// OfflineStore offline a store
func OfflineStore(physicallyDestroyed bool) StoreCreateOption {
// SetStoreState sets the state for the store.
func SetStoreState(state metapb.StoreState, physicallyDestroyed ...bool) StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Offline
meta.NodeState = metapb.NodeState_Removing
meta.PhysicallyDestroyed = physicallyDestroyed
store.meta = meta
}
}

// UpStore up a store
func UpStore() StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Up
meta.NodeState = metapb.NodeState_Serving
store.meta = meta
}
}

// TombstoneStore set a store to tombstone.
func TombstoneStore() StoreCreateOption {
return func(store *StoreInfo) {
meta := typeutil.DeepClone(store.meta, StoreFactory)
meta.State = metapb.StoreState_Tombstone
meta.NodeState = metapb.NodeState_Removed
switch state {
case metapb.StoreState_Up:
meta.State = metapb.StoreState_Up
meta.NodeState = metapb.NodeState_Serving
case metapb.StoreState_Offline:
if len(physicallyDestroyed) != 0 {
meta.State = metapb.StoreState_Offline
meta.NodeState = metapb.NodeState_Removing
meta.PhysicallyDestroyed = physicallyDestroyed[0]
} else {
panic("physicallyDestroyed should be set when set store state to offline")
}
case metapb.StoreState_Tombstone:
meta.State = metapb.StoreState_Tombstone
meta.NodeState = metapb.NodeState_Removed
}
store.meta = meta
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func TestCloneStore(t *testing.T) {
break
}
store.Clone(
UpStore(),
SetStoreState(metapb.StoreState_Up),
SetLastHeartbeatTS(time.Now()),
)
}
Expand Down
Loading

0 comments on commit da678ef

Please sign in to comment.