Skip to content

Commit

Permalink
cache: use redis.ParseClusterURL for Redis cluster (#925)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz authored Jan 13, 2025
1 parent 310a28f commit a2ec23f
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 200 deletions.
9 changes: 7 additions & 2 deletions .github/workflows/build_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,15 @@ jobs:
env:
MYSQL_URI: mysql://root:password@tcp(localhost:${{ job.services.mariadb.ports[3306] }})/

- name: Test Redis cluster
- name: Test Redis cluster (1 address)
run: go test ./storage/cache -run ^TestRedis
env:
REDIS_URI: redis+cluster://localhost:7005
REDIS_URI: redis+cluster://localhost:7000

- name: Test Redis cluster (6 addresses)
run: go test ./storage/cache -run ^TestRedis
env:
REDIS_URI: redis+cluster://localhost:7000?addr=localhost:7001&addr=localhost:7002&addr=localhost:7003&addr=localhost:7004&addr=localhost:7005

golangci:
name: lint
Expand Down
3 changes: 2 additions & 1 deletion config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
# The database for caching, support Redis, MySQL, Postgres and MongoDB:
# redis://<user>:<password>@<host>:<port>/<db_number>
# rediss://<user>:<password>@<host>:<port>/<db_number>
# redis+cluster://<user>:<password>@<host1>:<port1>,<host2>:<port2>,...,<hostN>:<portN>
# redis+cluster://<user>:<password>@<host>:<port>[?addr=<host2>:<port2>&addr=<host3>:<port3>]
# rediss+cluster://<user>:<password>@<host>:<port>[?addr=<host2>:<port2>&addr=<host3>:<port3>]
# mysql://[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
# postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full
# postgresql://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full
Expand Down
10 changes: 8 additions & 2 deletions storage/cache/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,14 @@ func Open(path, tablePrefix string, opts ...storage.Option) (Database, error) {
return nil, errors.Trace(err)
}
return database, nil
} else if strings.HasPrefix(path, storage.RedisClusterPrefix) {
opt, err := ParseRedisClusterURL(path)
} else if strings.HasPrefix(path, storage.RedisClusterPrefix) || strings.HasPrefix(path, storage.RedissClusterPrefix) {
var newURL string
if strings.HasPrefix(path, storage.RedisClusterPrefix) {
newURL = strings.Replace(path, storage.RedisClusterPrefix, storage.RedisPrefix, 1)
} else if strings.HasPrefix(path, storage.RedissClusterPrefix) {
newURL = strings.Replace(path, storage.RedissClusterPrefix, storage.RedissPrefix, 1)
}
opt, err := redis.ParseClusterURL(newURL)
if err != nil {
return nil, err
}
Expand Down
157 changes: 0 additions & 157 deletions storage/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"encoding/base64"
"fmt"
"io"
"net/url"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -513,159 +512,3 @@ func escape(s string) string {
)
return r.Replace(s)
}

func ParseRedisClusterURL(redisURL string) (*redis.ClusterOptions, error) {
options := &redis.ClusterOptions{}
uri := redisURL

var err error
if strings.HasPrefix(redisURL, storage.RedisClusterPrefix) {
uri = uri[len(storage.RedisClusterPrefix):]
} else {
return nil, fmt.Errorf("scheme must be \"redis+cluster\"")
}

if idx := strings.Index(uri, "@"); idx != -1 {
userInfo := uri[:idx]
uri = uri[idx+1:]

username := userInfo
var password string

if idx := strings.Index(userInfo, ":"); idx != -1 {
username = userInfo[:idx]
password = userInfo[idx+1:]
}

// Validate and process the username.
if strings.Contains(username, "/") {
return nil, fmt.Errorf("unescaped slash in username")
}
options.Username, err = url.PathUnescape(username)
if err != nil {
return nil, errors.Wrap(err, fmt.Errorf("invalid username"))
}

// Validate and process the password.
if strings.Contains(password, ":") {
return nil, fmt.Errorf("unescaped colon in password")
}
if strings.Contains(password, "/") {
return nil, fmt.Errorf("unescaped slash in password")
}
options.Password, err = url.PathUnescape(password)
if err != nil {
return nil, errors.Wrap(err, fmt.Errorf("invalid password"))
}
}

// fetch the hosts field
hosts := uri
if idx := strings.IndexAny(uri, "/?@"); idx != -1 {
if uri[idx] == '@' {
return nil, fmt.Errorf("unescaped @ sign in user info")
}
hosts = uri[:idx]
}

options.Addrs = strings.Split(hosts, ",")
uri = uri[len(hosts):]
if len(uri) > 0 && uri[0] == '/' {
uri = uri[1:]
}

// grab connection arguments from URI
connectionArgsFromQueryString, err := extractQueryArgsFromURI(uri)
if err != nil {
return nil, err
}
for _, pair := range connectionArgsFromQueryString {
err = addOption(options, pair)
if err != nil {
return nil, err
}
}

return options, nil
}

func extractQueryArgsFromURI(uri string) ([]string, error) {
if len(uri) == 0 {
return nil, nil
}

if uri[0] != '?' {
return nil, errors.New("must have a ? separator between path and query")
}

uri = uri[1:]
if len(uri) == 0 {
return nil, nil
}
return strings.FieldsFunc(uri, func(r rune) bool { return r == ';' || r == '&' }), nil
}

type optionHandler struct {
int *int
bool *bool
duration *time.Duration
}

func addOption(options *redis.ClusterOptions, pair string) error {
kv := strings.SplitN(pair, "=", 2)
if len(kv) != 2 || kv[0] == "" {
return fmt.Errorf("invalid option")
}

key, err := url.QueryUnescape(kv[0])
if err != nil {
return errors.Wrap(err, errors.Errorf("invalid option key %q", kv[0]))
}

value, err := url.QueryUnescape(kv[1])
if err != nil {
return errors.Wrap(err, errors.Errorf("invalid option value %q", kv[1]))
}

handlers := map[string]optionHandler{
"max_retries": {int: &options.MaxRetries},
"min_retry_backoff": {duration: &options.MinRetryBackoff},
"max_retry_backoff": {duration: &options.MaxRetryBackoff},
"dial_timeout": {duration: &options.DialTimeout},
"read_timeout": {duration: &options.ReadTimeout},
"write_timeout": {duration: &options.WriteTimeout},
"pool_fifo": {bool: &options.PoolFIFO},
"pool_size": {int: &options.PoolSize},
"pool_timeout": {duration: &options.PoolTimeout},
"min_idle_conns": {int: &options.MinIdleConns},
"max_idle_conns": {int: &options.MaxIdleConns},
"conn_max_idle_time": {duration: &options.ConnMaxIdleTime},
"conn_max_lifetime": {duration: &options.ConnMaxLifetime},
}

lowerKey := strings.ToLower(key)
if handler, ok := handlers[lowerKey]; ok {
if handler.int != nil {
*handler.int, err = strconv.Atoi(value)
if err != nil {
return errors.Wrap(err, fmt.Errorf("invalid '%s' value: %q", key, value))
}
} else if handler.duration != nil {
*handler.duration, err = time.ParseDuration(value)
if err != nil {
return errors.Wrap(err, fmt.Errorf("invalid '%s' value: %q", key, value))
}
} else if handler.bool != nil {
*handler.bool, err = strconv.ParseBool(value)
if err != nil {
return errors.Wrap(err, fmt.Errorf("invalid '%s' value: %q", key, value))
}
} else {
return fmt.Errorf("redis: unexpected option: %s", key)
}
} else {
return fmt.Errorf("redis: unexpected option: %s", key)
}

return nil
}
38 changes: 12 additions & 26 deletions storage/cache/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"github.com/zhenghaoz/gorse/base/log"
Expand Down Expand Up @@ -52,8 +53,17 @@ func (suite *RedisTestSuite) SetupSuite() {
suite.Database, err = Open(redisDSN, "gorse_")
suite.NoError(err)
// flush db
err = suite.Database.(*Redis).client.FlushDB(context.TODO()).Err()
suite.NoError(err)
redisClient, ok := suite.Database.(*Redis)
suite.True(ok)
if clusterClient, ok := redisClient.client.(*redis.ClusterClient); ok {
err = clusterClient.ForEachMaster(context.Background(), func(ctx context.Context, client *redis.Client) error {
return client.FlushDB(ctx).Err()
})
suite.NoError(err)
} else {
err = redisClient.client.FlushDB(context.TODO()).Err()
suite.NoError(err)
}
// create schema
err = suite.Database.Init()
suite.NoError(err)
Expand Down Expand Up @@ -114,27 +124,3 @@ func BenchmarkRedis(b *testing.B) {
// benchmark
benchmark(b, database)
}

func TestParseRedisClusterURL(t *testing.T) {
options, err := ParseRedisClusterURL("redis+cluster://username:password@127.0.0.1:6379,127.0.0.1:6380,127.0.0.1:6381/?" +
"max_retries=1000&dial_timeout=1h&pool_fifo=true")
if assert.NoError(t, err) {
assert.Equal(t, "username", options.Username)
assert.Equal(t, "password", options.Password)
assert.Equal(t, []string{"127.0.0.1:6379", "127.0.0.1:6380", "127.0.0.1:6381"}, options.Addrs)
assert.Equal(t, 1000, options.MaxRetries)
assert.Equal(t, time.Hour, options.DialTimeout)
assert.True(t, options.PoolFIFO)
}

_, err = ParseRedisClusterURL("redis://")
assert.Error(t, err)
_, err = ParseRedisClusterURL("redis+cluster://username:password@127.0.0.1:6379/?max_retries=a")
assert.Error(t, err)
_, err = ParseRedisClusterURL("redis+cluster://username:password@127.0.0.1:6379/?dial_timeout=a")
assert.Error(t, err)
_, err = ParseRedisClusterURL("redis+cluster://username:password@127.0.0.1:6379/?pool_fifo=a")
assert.Error(t, err)
_, err = ParseRedisClusterURL("redis+cluster://username:password@127.0.0.1:6379/?a=1")
assert.Error(t, err)
}
25 changes: 13 additions & 12 deletions storage/scheme.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,19 @@ import (
)

const (
MySQLPrefix = "mysql://"
MongoPrefix = "mongodb://"
MongoSrvPrefix = "mongodb+srv://"
PostgresPrefix = "postgres://"
PostgreSQLPrefix = "postgresql://"
ClickhousePrefix = "clickhouse://"
CHHTTPPrefix = "chhttp://"
CHHTTPSPrefix = "chhttps://"
SQLitePrefix = "sqlite://"
RedisPrefix = "redis://"
RedissPrefix = "rediss://"
RedisClusterPrefix = "redis+cluster://"
MySQLPrefix = "mysql://"
MongoPrefix = "mongodb://"
MongoSrvPrefix = "mongodb+srv://"
PostgresPrefix = "postgres://"
PostgreSQLPrefix = "postgresql://"
ClickhousePrefix = "clickhouse://"
CHHTTPPrefix = "chhttp://"
CHHTTPSPrefix = "chhttps://"
SQLitePrefix = "sqlite://"
RedisPrefix = "redis://"
RedissPrefix = "rediss://"
RedisClusterPrefix = "redis+cluster://"
RedissClusterPrefix = "rediss+cluster://"
)

func AppendURLParams(rawURL string, params []lo.Tuple2[string, string]) (string, error) {
Expand Down

0 comments on commit a2ec23f

Please sign in to comment.