Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve retry semantics #2330

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#### Changes
* Core: Improve command retry semantics ([#2330](https://github.com/valkey-io/valkey-glide/pull/2330))
* Node: Fix binary variant for xinfogroups and lrem ([#2324](https://github.com/valkey-io/valkey-glide/pull/2324))
* Node: Fixed missing exports ([#2301](https://github.com/valkey-io/valkey-glide/pull/2301))
* Node: Use `options` struct for all optional arguments ([#2287](https://github.com/valkey-io/valkey-glide/pull/2287))
Expand Down Expand Up @@ -127,6 +128,7 @@
* Java, Node, Python: Add NOSCORES option to ZSCAN & NOVALUES option to HSCAN (Valkey-8) ([#2174](https://github.com/valkey-io/valkey-glide/pull/2174))
* Node: Add SCAN command ([#2257](https://github.com/valkey-io/valkey-glide/pull/2257))
* Java: Add Script commands ([#2261](https://github.com/valkey-io/valkey-glide/pull/2261))
* Python: Replace google-api-python-client with protobuf ([#2304](https://github.com/valkey-io/valkey-glide/pull/2304))

#### Breaking Changes
* Java: Update INFO command ([#2274](https://github.com/valkey-io/valkey-glide/pull/2274))
Expand Down
8 changes: 4 additions & 4 deletions glide-core/THIRD_PARTY_LICENSES_RUST
Original file line number Diff line number Diff line change
Expand Up @@ -10269,7 +10269,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

----

Package: iana-time-zone:0.1.60
Package: iana-time-zone:0.1.61

The following copyrights and licenses were found in the source code of this package:

Expand Down Expand Up @@ -18826,7 +18826,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

----

Package: redox_syscall:0.5.3
Package: redox_syscall:0.5.4

The following copyrights and licenses were found in the source code of this package:

Expand Down Expand Up @@ -25397,7 +25397,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

----

Package: unicode-ident:1.0.12
Package: unicode-ident:1.0.13

The following copyrights and licenses were found in the source code of this package:

Expand Down Expand Up @@ -25687,7 +25687,7 @@ authorization of the copyright holder.

----

Package: unicode-normalization:0.1.23
Package: unicode-normalization:0.1.24

The following copyrights and licenses were found in the source code of this package:

Expand Down
6 changes: 5 additions & 1 deletion glide-core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ mod value_conversion;
use tokio::sync::mpsc;

pub const HEARTBEAT_SLEEP_DURATION: Duration = Duration::from_secs(1);
pub const DEFAULT_RETRIES: u32 = 3;

// Dont use retries because we do not expose this configuration to the user,
// making it is unexpected to have command retries
pub const DEFAULT_RETRIES: u32 = 0;

pub const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_CONNECTION_ATTEMPT_TIMEOUT: Duration = Duration::from_millis(250);
pub const DEFAULT_PERIODIC_TOPOLOGY_CHECKS_INTERVAL: Duration = Duration::from_secs(60);
Expand Down
1 change: 1 addition & 0 deletions go/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ lib.h
# benchmarking results
benchmarks/results/**
benchmarks/gobenchmarks.json
benchmarks/benchmarks
24 changes: 11 additions & 13 deletions go/api/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,13 @@ func (client *baseClient) executeCommand(requestType C.RequestType, args []strin
return nil, &ClosingError{"ExecuteCommand failed. The client is closed."}
}

cArgs, argLengths := toCStrings(args)
var cArgsPtr *C.uintptr_t = nil
var argLengthsPtr *C.ulong = nil
if len(args) > 0 {
cArgs, argLengths := toCStrings(args)
cArgsPtr = &cArgs[0]
argLengthsPtr = &argLengths[0]
}

resultChannel := make(chan payload)
resultChannelPtr := uintptr(unsafe.Pointer(&resultChannel))
Expand All @@ -112,8 +118,8 @@ func (client *baseClient) executeCommand(requestType C.RequestType, args []strin
C.uintptr_t(resultChannelPtr),
uint32(requestType),
C.size_t(len(args)),
&cArgs[0],
&argLengths[0],
cArgsPtr,
argLengthsPtr,
)
payload := <-resultChannel
if payload.error != nil {
Expand Down Expand Up @@ -160,23 +166,15 @@ func (client *baseClient) Get(key string) (string, error) {
}

func (client *baseClient) MSet(keyValueMap map[string]string) (string, error) {
flat := []string{}
for key, value := range keyValueMap {
flat = append(flat, key, value)
}
result, err := client.executeCommand(C.MSet, flat)
result, err := client.executeCommand(C.MSet, utils.MapToString(keyValueMap))
if err != nil {
return "", err
}
return handleStringResponse(result), nil
}

func (client *baseClient) MSetNX(keyValueMap map[string]string) (bool, error) {
flat := []string{}
for key, value := range keyValueMap {
flat = append(flat, key, value)
}
result, err := client.executeCommand(C.MSetNX, flat)
result, err := client.executeCommand(C.MSetNX, utils.MapToString(keyValueMap))
if err != nil {
return false, err
}
Expand Down
54 changes: 54 additions & 0 deletions go/api/glide_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package api
// #cgo LDFLAGS: -L../target/release -lglide_rs
// #include "../lib.h"
import "C"
import "github.com/valkey-io/valkey-glide/go/glide/utils"

// GlideClient is a client used for connection in Standalone mode.
type GlideClient struct {
Expand Down Expand Up @@ -41,3 +42,56 @@ func (client *GlideClient) CustomCommand(args []string) (interface{}, error) {
}
return handleStringOrNullResponse(res), nil
}

// Sets configuration parameters to the specified values.
//
// Note: Prior to Version 7.0.0, only one parameter can be send.
//
// Parameters:
//
// parameters - A map consisting of configuration parameters and their respective values to set.
//
// Return value:
//
// "OK" if all configurations have been successfully set. Otherwise, raises an error.
//
// For example:
//
// result, err := client.ConfigSet(map[string]string{"timeout": "1000", "maxmemory": "1GB"})
// result: "OK"
//
// [valkey.io]: https://valkey.io/commands/config-set/
func (client *GlideClient) ConfigSet(parameters map[string]string) (string, error) {
result, err := client.executeCommand(C.ConfigSet, utils.MapToString(parameters))
if err != nil {
return "", err
}
return handleStringResponse(result), nil
}

// Gets the values of configuration parameters.
//
// Note: Prior to Version 7.0.0, only one parameter can be send.
//
// Parameters:
//
// args - A slice of configuration parameter names to retrieve values for.
//
// Return value:
//
// A map of values corresponding to the configuration parameters.
//
// For example:
//
// result, err := client.ConfigGet([]string{"timeout" , "maxmemory"})
// result["timeout"] = "1000"
// result["maxmemory"] = "1GB"
//
// [valkey.io]: https://valkey.io/commands/config-get/
func (client *GlideClient) ConfigGet(args []string) (map[string]string, error) {
res, err := client.executeCommand(C.ConfigGet, args)
if err != nil {
return nil, err
}
return handleStringToStringMapResponse(res), nil
}
17 changes: 13 additions & 4 deletions go/api/response_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@ func handleStringOrNullResponse(response *C.struct_CommandResponse) string {

func handleStringArrayResponse(response *C.struct_CommandResponse) []string {
defer C.free_command_response(response)
var len []C.long
len = append(len, unsafe.Slice(response.array_elements_len, response.array_value_len)...)
var slice []string
for k, v := range unsafe.Slice(response.array_value, response.array_value_len) {
slice = append(slice, convertCharArrayToString(v, len[k]))
for _, v := range unsafe.Slice(response.array_value, response.array_value_len) {
slice = append(slice, convertCharArrayToString(v.string_value, v.string_value_len))
}
return slice
}
Expand All @@ -56,3 +54,14 @@ func handleBooleanResponse(response *C.struct_CommandResponse) bool {
defer C.free_command_response(response)
return bool(response.bool_value)
}

func handleStringToStringMapResponse(response *C.struct_CommandResponse) map[string]string {
defer C.free_command_response(response)
m := make(map[string]string, response.array_value_len)
for _, v := range unsafe.Slice(response.array_value, response.array_value_len) {
key := convertCharArrayToString(v.map_key.string_value, v.map_key.string_value_len)
value := convertCharArrayToString(v.map_value.string_value, v.map_value.string_value_len)
m[key] = value
}
return m
}
17 changes: 13 additions & 4 deletions go/benchmarks/benchmarking.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func executeBenchmarks(runConfig *runConfiguration, connectionSettings *connecti
return nil
}

var key_count int64 = 1

func runSingleBenchmark(config *benchmarkConfig) error {
fmt.Printf("Running benchmarking for %s client:\n", config.clientName)
fmt.Printf(
Expand Down Expand Up @@ -228,13 +230,13 @@ const (
func getActions(dataSize int) map[string]operations {
actions := map[string]operations{
getExisting: func(client benchmarkClient) (string, error) {
return client.get(keyFromExistingKeyspace())
return client.get(keyFromExistingKeyspace(getExisting))
},
getNonExisting: func(client benchmarkClient) (string, error) {
return client.get(keyFromNewKeyspace())
},
set: func(client benchmarkClient) (string, error) {
return client.set(keyFromExistingKeyspace(), strings.Repeat("0", dataSize))
return client.set(keyFromExistingKeyspace(set), strings.Repeat("0", dataSize))
},
}

Expand All @@ -246,8 +248,15 @@ const (
sizeExistingKeyspace = 3000000
)

func keyFromExistingKeyspace() string {
randNum, err := rand.Int(rand.Reader, big.NewInt(sizeExistingKeyspace))
func keyFromExistingKeyspace(action string) string {
if action == set {

if key_count < sizeNewKeyspace-1 {
key_count = key_count + 1
}
return fmt.Sprint(key_count)
}
randNum, err := rand.Int(rand.Reader, big.NewInt(key_count))
if err != nil {
log.Fatal("Error while generating random number for existing keyspace: ", err)
}
Expand Down
6 changes: 3 additions & 3 deletions go/integTest/glide_test_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type GlideTestSuite struct {
suite.Suite
standalonePorts []int
clusterPorts []int
redisVersion string
serverVersion string
clients []*api.GlideClient
clusterClients []*api.GlideClusterClient
}
Expand Down Expand Up @@ -55,8 +55,8 @@ func (suite *GlideTestSuite) SetupSuite() {
suite.T().Fatal(err.Error())
}

suite.redisVersion = extractServerVersion(string(byteOutput))
suite.T().Logf("Redis version = %s", suite.redisVersion)
suite.serverVersion = extractServerVersion(string(byteOutput))
suite.T().Logf("Detected server version = %s", suite.serverVersion)
}

func extractPorts(suite *GlideTestSuite, output string) []int {
Expand Down
48 changes: 48 additions & 0 deletions go/integTest/standalone_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,51 @@ func (suite *GlideTestSuite) TestCustomCommand_closedClient() {
assert.NotNil(suite.T(), err)
assert.IsType(suite.T(), &api.ClosingError{}, err)
}

func (suite *GlideTestSuite) TestConfigSetAndGet_multipleArgs() {
client := suite.defaultClient()

if suite.serverVersion < "7.0.0" {
suite.T().Skip("This feature is added in version 7")
}
configMap := map[string]string{"timeout": "1000", "maxmemory": "1GB"}
resultConfigMap := map[string]string{"timeout": "1000", "maxmemory": "1073741824"}
result, err := client.ConfigSet(configMap)
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), result, "OK")

result2, err := client.ConfigGet([]string{"timeout", "maxmemory"})
assert.Nil(suite.T(), err)
assert.Equal(suite.T(), resultConfigMap, result2)
}

func (suite *GlideTestSuite) TestConfigSetAndGet_noArgs() {
client := suite.defaultClient()

configMap := map[string]string{}

result, err := client.ConfigSet(configMap)
assert.Equal(suite.T(), "", result)
assert.NotNil(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)

result2, err := client.ConfigGet([]string{})
assert.Nil(suite.T(), result2)
assert.NotNil(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)
}

func (suite *GlideTestSuite) TestConfigSetAndGet_invalidArgs() {
client := suite.defaultClient()

configMap := map[string]string{"time": "1000"}

result, err := client.ConfigSet(configMap)
assert.Equal(suite.T(), "", result)
assert.NotNil(suite.T(), err)
assert.IsType(suite.T(), &api.RequestError{}, err)

result2, err := client.ConfigGet([]string{"time"})
assert.Equal(suite.T(), map[string]string{}, result2)
assert.Nil(suite.T(), err)
}
Loading
Loading