Skip to content

Commit

Permalink
feat: add re-entry and automatic renewal for redis lock (#150)
Browse files Browse the repository at this point in the history
  • Loading branch information
qloog committed Sep 4, 2024
1 parent 8549737 commit 39935b6
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## v1.10.0

- feat: support async flush log to disk
- feat: add re-entry and automatic renewal for redis lock
- chore: using gorm offical plguin for tracing and metrics

## v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ module github.com/go-eagle/eagle
go 1.22.3

require (
github.com/1024casts/gorm-opentelemetry v1.0.1-0.20210805144709-183269b54068
github.com/Shopify/sarama v1.19.0
github.com/alicebob/miniredis/v2 v2.15.1
github.com/cenkalti/backoff/v4 v4.2.1
Expand Down Expand Up @@ -74,6 +73,7 @@ require (
gorm.io/driver/mysql v1.5.2
gorm.io/driver/postgres v1.5.4
gorm.io/gorm v1.25.10
gorm.io/plugin/opentelemetry v0.1.4
)

require (
Expand Down Expand Up @@ -170,6 +170,7 @@ require (
github.com/shopspring/decimal v1.4.0 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/swaggo/swag v1.7.0 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
Expand Down Expand Up @@ -203,5 +204,4 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/plugin/opentelemetry v0.1.4 // indirect
)
9 changes: 1 addition & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/1024casts/gorm-opentelemetry v1.0.1-0.20210805144709-183269b54068 h1:XWMHFSEROBGd33gZNMsTb6zxECMN8xOJkO0ucOJdz58=
github.com/1024casts/gorm-opentelemetry v1.0.1-0.20210805144709-183269b54068/go.mod h1:nEAgMK5Iab8nqYy8zn1tYjEjkeDA2PmszXOwZhlDrMs=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
Expand Down Expand Up @@ -421,7 +419,6 @@ github.com/jackc/puddle/v2 v2.2.1 h1:RhxXJtFG022u4ibrCSMSiu5aOq1i77R3OHKNJj77OAk
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E=
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.1/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand Down Expand Up @@ -503,7 +500,6 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng=
github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGwNd0Lj+XmI=
github.com/mattn/go-sqlite3 v1.14.15 h1:vfoHhTN1af61xCRSWzFIWzx2YskyMTwHLrExkBOjvxI=
github.com/mattn/go-sqlite3 v1.14.15/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand Down Expand Up @@ -1255,11 +1251,8 @@ gorm.io/driver/mysql v1.5.2 h1:QC2HRskSE75wBuOxe0+iCkyJZ+RqpudsQtqkp+IMuXs=
gorm.io/driver/mysql v1.5.2/go.mod h1:pQLhh1Ut/WUAySdTHwBpBv6+JKcj+ua4ZFx1QQTBzb8=
gorm.io/driver/postgres v1.5.4 h1:Iyrp9Meh3GmbSuyIAGyjkN+n9K+GHX9b9MqsTL4EJCo=
gorm.io/driver/postgres v1.5.4/go.mod h1:Bgo89+h0CRcdA33Y6frlaHHVuTdOf87pmyzwW9C/BH0=
gorm.io/driver/sqlite v1.1.4 h1:PDzwYE+sI6De2+mxAneV9Xs11+ZyKV6oxD3wDGkaNvM=
gorm.io/driver/sqlite v1.1.4/go.mod h1:mJCeTFr7+crvS+TRnWc5Z3UvwxUN1BGBLMrf5LA9DYw=
gorm.io/driver/sqlite v1.5.0 h1:zKYbzRCpBrT1bNijRnxLDJWPjVfImGEn0lSnUY5gZ+c=
gorm.io/gorm v1.20.7/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
gorm.io/gorm v1.20.12/go.mod h1:0HFTzE/SqkGTzK6TlDPPQbAYCluiVvhzoA1+aVyzenw=
gorm.io/driver/sqlite v1.5.0/go.mod h1:kDMDfntV9u/vuMmz8APHtHF0b4nyBB7sfCieC6G8k8I=
gorm.io/gorm v1.25.2-0.20230530020048-26663ab9bf55/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k=
gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s=
gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
Expand Down
22 changes: 22 additions & 0 deletions pkg/lock/luascript.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package lock

var (
// lockscript lua script for acrequire a lock
lockLuaScript = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2])
return "OK"
else
return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
end
`

// unlockscript lua script for release a lock
unlockLuaScript = `
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
`
)
119 changes: 101 additions & 18 deletions pkg/lock/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,42 @@ package lock

import (
"context"
"errors"
"fmt"
"strconv"
"sync"
"time"

"github.com/redis/go-redis/v9"

"github.com/go-eagle/eagle/pkg/log"
)

"github.com/redis/go-redis/v9"
const (
// renewalDuration is the renewal duration.
renewalDuration int64 = 1000
)

// Lua scripts for locking and unlocking
// It only init once and cache in memory
var (

// lockScript init lua script
lockScript = redis.NewScript(lockLuaScript)

// unlockScript init lua script
unlockScript = redis.NewScript(unlockLuaScript)
)

// RedisLock is a redis lock.
// RedisLock is a Redis-based distributed lock.
type RedisLock struct {
key string
redisClient *redis.Client
token string
expiration time.Duration
mu sync.Mutex // 用于保护共享属性
renewing bool // 续期标志
stopRenew chan struct{} // 用于停止续期
}

// NewRedisLock new a redis lock instance
Expand All @@ -26,38 +48,99 @@ func NewRedisLock(rdb *redis.Client, key string, expiration time.Duration) *Redi
redisClient: rdb,
token: genToken(),
expiration: expiration,
stopRenew: make(chan struct{}),
}
return opt
}

// Lock acquires the lock.
// It will return false if the lock is already acquired.
func (l *RedisLock) Lock(ctx context.Context) (bool, error) {
isSet, err := l.redisClient.SetNX(ctx, l.key, l.token, l.expiration).Result()
if err == redis.Nil {
return false, nil
} else if err != nil {
log.Errorf("acquires the lock err, key: %s, err: %s", l.key, err.Error())
// 加锁,防止并发问题
l.mu.Lock()
defer l.mu.Unlock()

ret, err := lockScript.Run(ctx, l.redisClient, []string{l.key},
[]string{l.token, strconv.FormatInt(l.expiration.Milliseconds()+renewalDuration, 10)},
).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return false, nil
}

log.WithContext(ctx).Errorf("redis lock: acquires the lock err, key: %s, err: %s", l.key, err.Error())
return false, err
} else if ret == nil {
return false, nil
}

reply, ok := ret.(string)
if ok && reply == "OK" {
if !l.renewing {
l.renewing = true
go l.autoRenew(ctx) // 启动续期协程
}
return true, nil
}
return isSet, nil

return false, nil
}

// Unlock del the lock.
// Unlock release a lock.
// NOTE: token 一致才会执行删除,避免误删,这里用了lua脚本进行事务处理
func (l *RedisLock) Unlock(ctx context.Context) (bool, error) {
luaScript := "if redis.call('GET',KEYS[1]) == ARGV[1] then return redis.call('DEL',KEYS[1]) else return 0 end"
ret, err := l.redisClient.Eval(ctx, luaScript, []string{l.key}, l.token).Result()
if err != nil {
return false, err
// 解锁时也需要加锁保护
l.mu.Lock()
defer l.mu.Unlock()

// 停止续期协程并标记不再续期
if l.renewing {
close(l.stopRenew)
l.renewing = false
}
reply, ok := ret.(int64)
if !ok {
return false, nil

for i := 0; i < 3; i++ { // 最多重试3次
ret, err := unlockScript.Run(ctx, l.redisClient, []string{l.key}, l.token).Result()
if err != nil {
log.WithContext(ctx).Errorf("redis lock: failed to unlock, attempt %d, key: %s, err: %v", i+1, l.key, err)
time.Sleep(50 * time.Millisecond) // 等待一下再重试
continue
}
reply, ok := ret.(int64)
if ok && reply == 1 {
return true, nil
}
break
}
return reply == 1, nil

return false, errors.New("redis lock: failed to unlock after multiple attempts")
}

func (l *RedisLock) autoRenew(ctx context.Context) {
ticker := time.NewTicker(time.Duration(renewalDuration) * time.Millisecond / 2)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// 续期操作
l.mu.Lock()
l.redisClient.Expire(ctx, l.key, l.expiration)
l.mu.Unlock()
case <-l.stopRenew:
return
case <-ctx.Done():
return
}
}
}

// GetTTL returns the TTL of the lock.
func (l *RedisLock) GetTTL(ctx context.Context) (time.Duration, error) {
return l.redisClient.TTL(ctx, l.key).Result()
}

// getRedisKey 获取key
// getRedisKey returns the Redis key for the lock.
func getRedisKey(key string) string {
return fmt.Sprintf(RedisLockKey, key)
}
85 changes: 85 additions & 0 deletions pkg/lock/redis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package lock_test

import (
"context"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/go-eagle/eagle/pkg/lock"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

func TestRedisLock(t *testing.T) {
// Set up a miniredis server
s, err := miniredis.Run()
if err != nil {
t.Fatalf("failed to start miniredis: %v", err)
}
defer s.Close()

// Create a Redis client
rdb := redis.NewClient(&redis.Options{
Addr: s.Addr(),
})

ctx := context.Background()

// Test acquiring the lock
l := lock.NewRedisLock(rdb, "test-key", 5*time.Second)
locked, err := l.Lock(ctx)
assert.NoError(t, err, "expected no error on acquiring lock")
assert.True(t, locked, "expected to successfully acquire the lock")

// Test re-acquiring the lock
lockedAgain, err := l.Lock(ctx)
assert.NoError(t, err, "expected no error on re-acquiring lock")
assert.True(t, lockedAgain, "expected not to acquire the lock again")

// Test releasing the lock
unlocked, err := l.Unlock(ctx)
assert.NoError(t, err, "expected no error on releasing lock")
assert.True(t, unlocked, "expected to successfully release the lock")

// Test re-acquiring the lock after release
lockedAgain, err = l.Lock(ctx)
assert.NoError(t, err, "expected no error on re-acquiring lock after release")
assert.True(t, lockedAgain, "expected to successfully re-acquire the lock after release")
}

func TestAutoRenew(t *testing.T) {
// Set up a miniredis server
s, err := miniredis.Run()
if err != nil {
t.Fatalf("failed to start miniredis: %v", err)
}
defer s.Close()

// Create a Redis client
rdb := redis.NewClient(&redis.Options{
Addr: s.Addr(),
})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Test auto-renewal of the lock
l := lock.NewRedisLock(rdb, "test-key-auto-renew", 1*time.Second)
locked, err := l.Lock(ctx)
assert.NoError(t, err, "expected no error on acquiring lock")
assert.True(t, locked, "expected to successfully acquire the lock")

// Wait for some time to ensure the lock is being renewed
time.Sleep(1500 * time.Millisecond)

// Check the TTL of the key to verify that it has been renewed
ttl, err := l.GetTTL(ctx)
assert.NoError(t, err, "expected no error on checking TTL")
assert.Greater(t, ttl, time.Duration(0), "expected TTL to be greater than zero due to renewal")

// Release the lock
unlocked, err := l.Unlock(ctx)
assert.NoError(t, err, "expected no error on releasing lock")
assert.True(t, unlocked, "expected to successfully release the lock")
}

0 comments on commit 39935b6

Please sign in to comment.