Skip to content

Commit

Permalink
controller: try use atomic replace the lock
Browse files Browse the repository at this point in the history
Signed-off-by: nolouch <nolouch@gmail.com>
  • Loading branch information
nolouch committed Jan 3, 2025
1 parent 41919ad commit 446ae1d
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 2 deletions.
78 changes: 76 additions & 2 deletions client/resource_group/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/gogo/protobuf/proto"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -1463,7 +1464,10 @@ func (gc *groupCostController) onResponseImpl(
}
}
}

// no need to record the consumption, fast path.
if delta.RRU+delta.WRU == 0 {
return delta, nil
}
gc.mu.Lock()
// Record the consumption of the request
add(gc.mu.consumption, delta)
Expand Down Expand Up @@ -1504,7 +1508,10 @@ func (gc *groupCostController) onResponseWaitImpl(
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}

// no need to record the consumption, fast path.
if delta.RRU+delta.WRU == 0 {
return delta, waitDuration, nil
}
gc.mu.Lock()
// Record the consumption of the request
add(gc.mu.consumption, delta)
Expand All @@ -1522,6 +1529,73 @@ func (gc *groupCostController) onResponseWaitImpl(
return delta, waitDuration, nil
}

func (gc *groupCostController) onResponseWaitAtomicImpl(
ctx context.Context, req RequestInfo, resp ResponseInfo,
) (*rmpb.Consumption, time.Duration, error) {
delta := &rmpb.Consumption{}
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}
var waitDuration time.Duration
if !gc.burstable.Load() {
allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load()
d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt)
if err != nil {
if errs.ErrClientResourceGroupThrottled.Equal(err) {
gc.metrics.failedRequestCounterWithThrottled.Inc()
gc.metrics.failedLimitReserveDuration.Observe(d.Seconds())
} else {
gc.metrics.failedRequestCounterWithOthers.Inc()
}
return nil, waitDuration, err
}
gc.metrics.successfulRequestDuration.Observe(d.Seconds())
waitDuration += d
}
// no need to record the consumption, fast path.
if delta.RRU+delta.WRU == 0 {
return delta, waitDuration, nil
}
// Record the consumption of the request
AtomicAddConsumption(gc.mu.consumption, delta)
// Record the consumption of the request by store
count := &rmpb.Consumption{}
*count = *delta
// As the penalty is only counted when the request is completed, so here needs to calculate the write cost which is added in `BeforeKVRequest`
for _, calc := range gc.calculators {
calc.BeforeKVRequest(count, req)
}
AtomicAddConsumption(gc.mu.storeCounter[req.StoreID()], count)
AtomicAddConsumption(gc.mu.globalCounter, count)
return delta, waitDuration, nil
}

// AtomicAdd applies an atomic addition on a field of Consumption.
func AtomicAdd(target *float64, delta float64) {
for {
old := atomic.LoadUint64((*uint64)(unsafe.Pointer(target)))
newValue := math.Float64bits(math.Float64frombits(old) + delta)
if atomic.CompareAndSwapUint64((*uint64)(unsafe.Pointer(target)), old, newValue) {
break
}
}
}

// AtomicAddConsumption performs atomic addition for all fields in Consumption.
func AtomicAddConsumption(target, delta *rmpb.Consumption) {
if target == nil || delta == nil {
return
}
AtomicAdd(&target.RRU, delta.RRU)
AtomicAdd(&target.WRU, delta.WRU)
AtomicAdd(&target.ReadBytes, delta.ReadBytes)
AtomicAdd(&target.WriteBytes, delta.WriteBytes)
AtomicAdd(&target.TotalCpuTimeMs, delta.TotalCpuTimeMs)
AtomicAdd(&target.SqlLayerCpuTimeMs, delta.SqlLayerCpuTimeMs)
AtomicAdd(&target.KvReadRpcCount, delta.KvReadRpcCount)
AtomicAdd(&target.KvWriteRpcCount, delta.KvWriteRpcCount)
}

// GetActiveResourceGroup is used to get active resource group.
// This is used for test only.
func (c *ResourceGroupsController) GetActiveResourceGroup(resourceGroupName string) *rmpb.ResourceGroup {
Expand Down
Binary file added client/resource_group/controller/controller.test
Binary file not shown.
69 changes: 69 additions & 0 deletions client/resource_group/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,72 @@ func TestTryGetController(t *testing.T) {
re.NoError(err)
re.NotEmpty(consumption)
}

func BenchmarkRequestAndResponseConsumptionLockVer(b *testing.B) {
gc := createTestGroupCostController(require.New(b))
testCases := []struct {
req *TestRequestInfo
resp *TestResponseInfo
}{
{
req: &TestRequestInfo{
isWrite: false,
writeBytes: 0,
},
resp: &TestResponseInfo{
readBytes: 100,
kvCPU: 100 * time.Millisecond,
succeed: true,
},
},
}
// exclude the token bucket locks
gc.burstable.Store(true)

// Use b.RunParallel to simulate concurrent scenarios
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for idx, testCase := range testCases {
_, _, err := gc.onResponseWaitImpl(context.TODO(), testCase.req, testCase.resp)
if err != nil {
b.Fatalf("onResponseImpl failed: %v (%d)", err, idx)
}
}
}
})
}

func BenchmarkRequestAndResponseConsumptionAtomicVer(b *testing.B) {
gc := createTestGroupCostController(require.New(b))
testCases := []struct {
req *TestRequestInfo
resp *TestResponseInfo
}{
{
req: &TestRequestInfo{
isWrite: false,
writeBytes: 0,
},
resp: &TestResponseInfo{
readBytes: 100,
kvCPU: 100 * time.Millisecond,
succeed: true,
},
},
}

// exclude the token bucket locks
gc.burstable.Store(true)

// Use b.RunParallel to simulate concurrent scenarios
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
for idx, testCase := range testCases {
_, _, err := gc.onResponseWaitAtomicImpl(context.TODO(), testCase.req, testCase.resp)
if err != nil {
b.Fatalf("onResponseImpl failed: %v (%d)", err, idx)
}
}
}
})
}

0 comments on commit 446ae1d

Please sign in to comment.