Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add protocol option #2598

Merged
merged 1 commit into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type ClusterOptions struct {

OnConnect func(ctx context.Context, cn *Conn) error

Protocol int
Username string
Password string

Expand Down Expand Up @@ -263,6 +264,7 @@ func (opt *ClusterOptions) clientOptions() *Options {
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,

Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,

Expand Down
38 changes: 38 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,35 @@ var _ = Describe("ClusterClient", func() {
})
}

Describe("ClusterClient PROTO 2", func() {
BeforeEach(func() {
opt = redisClusterOptions()
opt.Protocol = 2
client = cluster.newClusterClient(ctx, opt)

err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(client.Close()).NotTo(HaveOccurred())
})

It("should CLUSTER PROTO 2", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainElements("proto", int64(2)))
return nil
})
})
})

Describe("ClusterClient", func() {
BeforeEach(func() {
opt = redisClusterOptions()
Expand Down Expand Up @@ -746,6 +775,15 @@ var _ = Describe("ClusterClient", func() {
})
})

It("should CLUSTER PROTO 3", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
return nil
})
})

It("should CLUSTER MYSHARDID", func() {
shardID, err := client.ClusterMyShardID(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expand Down
4 changes: 4 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Options struct {
// Hook that is called when new connection is established.
OnConnect func(ctx context.Context, cn *Conn) error

// Protocol 2 or 3. Use the version to negotiate RESP version with redis-server.
// Default is 3.
Protocol int
// Use the specified Username to authenticate the current connection
// with one of the connections defined in the ACL list when connecting
// to a Redis 6.0 instance, or greater, that is using the Redis ACL system.
Expand Down Expand Up @@ -437,6 +440,7 @@ func setupConnParams(u *url.URL, o *Options) (*Options, error) {
o.DB = db
}

o.Protocol = q.int("protocol")
o.ClientName = q.string("client_name")
o.MaxRetries = q.int("max_retries")
o.MinRetryBackoff = q.duration("min_retry_backoff")
Expand Down
3 changes: 3 additions & 0 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func TestParseURL(t *testing.T) {
}, {
url: "redis://localhost:123/?db=2&client_name=hi", // client name
o: &Options{Addr: "localhost:123", DB: 2, ClientName: "hi"},
}, {
url: "redis://localhost:123/?db=2&protocol=2", // RESP Protocol
o: &Options{Addr: "localhost:123", DB: 2, Protocol: 2},
}, {
url: "unix:///tmp/redis.sock",
o: &Options{Addr: "/tmp/redis.sock"},
Expand Down
7 changes: 6 additions & 1 deletion redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,15 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
conn := newConn(c.opt, connPool)

var auth bool
protocol := c.opt.Protocol
// By default, use RESP3 in current version.
if protocol < 2 {
protocol = 3
}

// for redis-server versions that do not support the HELLO command,
// RESP2 will continue to be used.
if err := conn.Hello(ctx, 3, username, password, "").Err(); err == nil {
if err := conn.Hello(ctx, protocol, username, password, "").Err(); err == nil {
auth = true
} else if !isRedisError(err) {
// When the server responds with the RESP protocol and the result is not a normal
Expand Down
27 changes: 27 additions & 0 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,33 @@ var _ = Describe("Client", func() {
Expect(val).Should(ContainSubstring("name=hi"))
})

It("should client PROTO 2", func() {
opt := redisOptions()
opt.Protocol = 2
db := redis.NewClient(opt)

defer func() {
Expect(db.Close()).NotTo(HaveOccurred())
}()

val, err := db.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainElements("proto", int64(2)))
})

It("should client PROTO 3", func() {
opt := redisOptions()
db := redis.NewClient(opt)

defer func() {
Expect(db.Close()).NotTo(HaveOccurred())
}()

val, err := db.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
})

It("processes custom commands", func() {
cmd := redis.NewCmd(ctx, "PING")
_ = client.Process(ctx, cmd)
Expand Down
4 changes: 3 additions & 1 deletion ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"time"

"github.com/cespare/xxhash/v2"
rendezvous "github.com/dgryski/go-rendezvous" //nolint
"github.com/dgryski/go-rendezvous" //nolint

"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/hashtag"
Expand Down Expand Up @@ -70,6 +70,7 @@ type RingOptions struct {
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
OnConnect func(ctx context.Context, cn *Conn) error

Protocol int
Username string
Password string
DB int
Expand Down Expand Up @@ -136,6 +137,7 @@ func (opt *RingOptions) clientOptions() *Options {
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,

Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,
DB: opt.DB,
Expand Down
40 changes: 40 additions & 0 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,37 @@ import (
"github.com/redis/go-redis/v9"
)

var _ = Describe("Redis Ring PROTO 2", func() {
const heartbeat = 100 * time.Millisecond

var ring *redis.Ring

BeforeEach(func() {
opt := redisRingOptions()
opt.Protocol = 2
opt.HeartbeatFrequency = heartbeat
ring = redis.NewRing(opt)

err := ring.ForEachShard(ctx, func(ctx context.Context, cl *redis.Client) error {
return cl.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
})

AfterEach(func() {
Expect(ring.Close()).NotTo(HaveOccurred())
})

It("should ring PROTO 2", func() {
_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainElements("proto", int64(2)))
return nil
})
})
})

var _ = Describe("Redis Ring", func() {
const heartbeat = 100 * time.Millisecond

Expand Down Expand Up @@ -65,6 +96,15 @@ var _ = Describe("Redis Ring", func() {
})
})

It("should ring PROTO 3", func() {
_ = ring.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
return nil
})
})

It("distributes keys", func() {
setRingKeys()

Expand Down
3 changes: 3 additions & 0 deletions sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type FailoverOptions struct {
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
OnConnect func(ctx context.Context, cn *Conn) error

Protocol int
Username string
Password string
DB int
Expand Down Expand Up @@ -88,6 +89,7 @@ func (opt *FailoverOptions) clientOptions() *Options {
OnConnect: opt.OnConnect,

DB: opt.DB,
Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,

Expand Down Expand Up @@ -151,6 +153,7 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,

Protocol: opt.Protocol,
Username: opt.Username,
Password: opt.Password,

Expand Down
67 changes: 67 additions & 0 deletions sentinel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,30 @@ import (
"github.com/redis/go-redis/v9"
)

var _ = Describe("Sentinel PROTO 2", func() {
var client *redis.Client

BeforeEach(func() {
client = redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: sentinelName,
SentinelAddrs: sentinelAddrs,
MaxRetries: -1,
Protocol: 2,
})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})

AfterEach(func() {
_ = client.Close()
})

It("should sentinel client PROTO 2", func() {
val, err := client.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainElements("proto", int64(2)))
})
})

var _ = Describe("Sentinel", func() {
var client *redis.Client
var master *redis.Client
Expand Down Expand Up @@ -134,6 +158,40 @@ var _ = Describe("Sentinel", func() {
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=sentinel_hi"))
})

It("should sentinel client PROTO 3", func() {
val, err := client.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
})
})

var _ = Describe("NewFailoverClusterClient PROTO 2", func() {
var client *redis.ClusterClient

BeforeEach(func() {
client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
MasterName: sentinelName,
SentinelAddrs: sentinelAddrs,
Protocol: 2,

RouteRandomly: true,
})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
})

AfterEach(func() {
_ = client.Close()
})

It("should sentinel cluster PROTO 2", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := client.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainElements("proto", int64(2)))
return nil
})
})
})

var _ = Describe("NewFailoverClusterClient", func() {
Expand Down Expand Up @@ -237,6 +295,15 @@ var _ = Describe("NewFailoverClusterClient", func() {
return nil
})
})

It("should sentinel cluster PROTO 3", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := client.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
return nil
})
})
})

var _ = Describe("SentinelAclAuth", func() {
Expand Down
4 changes: 4 additions & 0 deletions universal.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type UniversalOptions struct {
Dialer func(ctx context.Context, network, addr string) (net.Conn, error)
OnConnect func(ctx context.Context, cn *Conn) error

Protocol int
Username string
Password string
SentinelUsername string
Expand Down Expand Up @@ -77,6 +78,7 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
Dialer: o.Dialer,
OnConnect: o.OnConnect,

Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,

Expand Down Expand Up @@ -122,6 +124,7 @@ func (o *UniversalOptions) Failover() *FailoverOptions {
OnConnect: o.OnConnect,

DB: o.DB,
Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,
SentinelUsername: o.SentinelUsername,
Expand Down Expand Up @@ -162,6 +165,7 @@ func (o *UniversalOptions) Simple() *Options {
OnConnect: o.OnConnect,

DB: o.DB,
Protocol: o.Protocol,
Username: o.Username,
Password: o.Password,

Expand Down