Skip to content

Commit

Permalink
Redis integration tests launcher (#183)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniele Salvatore Albano <d.albano@gmail.com>
  • Loading branch information
Valkyrie00 and danielealbano authored Aug 20, 2022
1 parent 224b39e commit fa0b8ae
Showing 11 changed files with 1,322 additions and 23 deletions.
4 changes: 2 additions & 2 deletions src/config.c
Original file line number Diff line number Diff line change
@@ -136,8 +136,8 @@ bool config_validate_after_load(

// Validate ad-hoc protocol settings (redis)
if (module.type == CONFIG_MODULE_TYPE_REDIS) {
if (module.redis->max_key_length > SLAB_OBJECT_SIZE_MAX) {
LOG_E(TAG, "The allowed maximum value of max_key_length is <%u>", SLAB_OBJECT_SIZE_MAX);
if (module.redis->max_key_length > SLAB_OBJECT_SIZE_MAX - 1) {
LOG_E(TAG, "The allowed maximum value of max_key_length is <%u>", SLAB_OBJECT_SIZE_MAX - 1);
return_result = false;
}
}
39 changes: 29 additions & 10 deletions src/module/redis/module_redis_command.c
Original file line number Diff line number Diff line change
@@ -349,19 +349,28 @@ bool module_redis_command_process_argument_stream_data(

size_t written_data = 0;
do {
// When all the data have been written current_chunk.index will always match chunk_sequence_count, this assert
// catch the cases when this function is invoked to write data even if all the chunks have been written.
assert(string->current_chunk.index < chunk_sequence->count);

storage_db_chunk_info_t *chunk_info = storage_db_chunk_sequence_get(
chunk_sequence,
string->current_chunk.index);

size_t chunk_length_to_write = chunk_length - written_data;
size_t chunk_available_size = chunk_info->chunk_length - string->current_chunk.offset;
size_t chunk_data_to_write_length =
chunk_length > chunk_available_size ? chunk_available_size : chunk_length;
chunk_length_to_write > chunk_available_size ? chunk_available_size : chunk_length_to_write;

// There should always be something to write
assert(chunk_length_to_write > 0);
assert(chunk_data_to_write_length > 0);

bool res = storage_db_chunk_write(
connection_context->db,
chunk_info,
string->current_chunk.offset,
chunk_data,
chunk_data + written_data,
chunk_data_to_write_length);

if (!res) {
@@ -378,6 +387,7 @@ bool module_redis_command_process_argument_stream_data(

if (string->current_chunk.offset == chunk_info->chunk_length) {
string->current_chunk.index++;
string->current_chunk.offset = 0;
}
} while(written_data < chunk_length);

@@ -870,14 +880,23 @@ bool module_redis_command_stream_entry_with_multiple_chunks(
buffer_to_send_length = chunk_info->chunk_length;
}

// TODO: check if it's the last chunk and, if yes, if it would fit in the send buffer with the protocol
// bits that have to be sent later without doing an implicit flush
if (network_send_direct(
network_channel,
buffer_to_send,
buffer_to_send_length) != NETWORK_OP_RESULT_OK) {
return false;
}
size_t sent_data = 0;
do {
size_t data_available_to_send_length = buffer_to_send_length - sent_data;
size_t data_to_send_length = data_available_to_send_length > NETWORK_CHANNEL_PACKET_SIZE
? NETWORK_CHANNEL_PACKET_SIZE : data_available_to_send_length;

// TODO: check if it's the last chunk and, if yes, if it would fit in the send buffer with the protocol
// bits that have to be sent later without doing an implicit flush
if (network_send_direct(
network_channel,
buffer_to_send + sent_data,
data_to_send_length) != NETWORK_OP_RESULT_OK) {
return false;
}

sent_data += data_to_send_length;
} while (sent_data < buffer_to_send_length);
}

send_buffer = send_buffer_start = network_send_buffer_acquire_slice(
12 changes: 2 additions & 10 deletions src/network/channel/network_channel.h
Original file line number Diff line number Diff line change
@@ -5,17 +5,9 @@
extern "C" {
#endif

#define NETWORK_CHANNEL_PACKET_SIZE (8 * 1024)

// The NETWORK_CHANNEL_RECV_BUFFER_SIZE has to be twice the NETWORK_CHANNEL_PACKET_SIZE to ensure that it's always
// possible to read a full packet in addition to any partially received data while processing the buffer and that there
// is enough room to avoid copying continuously the data at the beginning (a streaming parser is being used so there
// maybe data that need still to be parsed)
#define NETWORK_CHANNEL_PACKET_SIZE (32 * 1024)
#define NETWORK_CHANNEL_RECV_BUFFER_SIZE (NETWORK_CHANNEL_PACKET_SIZE * 2)

// Do not lower, to improve the performances the code expects to be able to send up to this amount of data, and do
// not increase as the slab allocator supports only up to 64kb.
#define NETWORK_CHANNEL_SEND_BUFFER_SIZE (64 * 1024)
#define NETWORK_CHANNEL_SEND_BUFFER_SIZE NETWORK_CHANNEL_PACKET_SIZE

typedef char network_channel_buffer_data_t;

2 changes: 1 addition & 1 deletion src/storage/db/storage_db.h
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@ extern "C" {
//#define STORAGE_DB_SHARD_MAGIC_NUMBER_LOW 0x5241000000000000

#define STORAGE_DB_SHARD_VERSION 1
#define STORAGE_DB_CHUNK_MAX_SIZE (64 * 1024)
#define STORAGE_DB_CHUNK_MAX_SIZE ((64 * 1024) - 1)

// This magic value defines the size of the ring buffer used to keep in memory data long enough to be sure they are not
// being in use anymore.
167 changes: 167 additions & 0 deletions tests/integration_tests/redis_server/client.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Copyright (C) 2018-2022 Vito Castellano
# All rights reserved.
#
# This software may be modified and distributed under the terms
# of the BSD license. See the LICENSE file for details.

proc spawn_client {} {
puts "\n** Spawn Client"
cleanup

# Set global keys
set ::idle_clients {}
set ::active_clients {}
array set ::active_clients_task {}
array set ::clients_start_time {}
set ::clients_time_history {}
set ::failed_tests {}
set ::clients_pids {}

# Finding free socketport
set client_socket_port [find_available_port [expr {$::socket_port - 32}] 32]

if {$::verbose} { puts -nonewline "Starting socket server at port $client_socket_port... "}
socket -server accept_test_clients -myaddr $::socket_host $client_socket_port
if {$::verbose} {puts "OK"}

if {$::verbose} { puts "Will be spawned: $::numclients clients... "}
set start_port $::socket_port
set port_count [expr {$::portcount / $::numclients}]
for {set j 0} {$j < $::numclients} {incr j} {
set p [exec $::tclsh [info script] {*}$::argv --client $client_socket_port &]
if {$::verbose} { puts "- Creating instances with PID: $p"}
lappend ::clients_pids $p
incr start_port $port_count
}
if {$::verbose} {puts "Spawning Clients... OK \n"}

# Enter the event loop to handle clients I/O
after 100 client_watcher
vwait forever
}

proc client_watcher {} {
set elapsed [expr {[clock seconds]-$::last_progress}]

if {$elapsed > $::timeout} {
puts "\[TIMEOUT\]: clients state report follows."
kill_clients
the_end
}

after 100 client_watcher
}

proc accept_test_clients {fd addr port} {
fconfigure $fd -encoding binary
fileevent $fd readable [list read_from_test_client $fd]
}

proc read_from_test_client fd {
set bytes [gets $fd]
set payload [read $fd $bytes]
foreach {status data elapsed} $payload break
set ::last_progress [clock seconds]

if {$status eq {ready}} {
if {$::verbose} { puts "Client with pid \[$data\] recieve \[$status\] status" }
signal_idle_client $fd

} elseif {$status eq {done}} {
set elapsed [expr {[clock seconds]-$::clients_start_time($fd)}]
set all_tests_count [llength $::all_tests]
set running_tests_count [expr {[llength $::active_clients]-1}]
set completed_tests_count [expr {$::next_test-$running_tests_count}]

puts "\[$completed_tests_count/$all_tests_count $status\]: $data ($elapsed seconds)"
lappend ::clients_time_history $elapsed $data

signal_idle_client $fd
set ::active_clients_task($fd) "(DONE) $data"

} elseif {$status eq {ok}} {
if {$::verbose} { puts "\[$status\]: $data ($elapsed ms)" }
set ::active_clients_task($fd) "(OK) $data"

} elseif {$status eq {skip}} {
if {$::verbose} { puts "\[$status\]: $data" }

} elseif {$status eq {ignore}} {
if {$::verbose} { puts "\[$status\]: $data" }

} elseif {$status eq {err}} {
set err "\[$status\]: $data"
puts $err
lappend ::failed_tests $err
set ::active_clients_task($fd) "(ERR) $data"

} elseif {$status eq {exception}} {
puts "\[$status\]: $data"
kill_clients
kill_server $::srv
exit 1
} elseif {$status eq {testing}} {
set ::active_clients_task($fd) "(IN PROGRESS) $data"

} elseif {$status eq {server-spawning}} {
set ::active_clients_task($fd) "(SPAWNING SERVER) $data"

} elseif {$status eq {server-spawned}} {
lappend ::active_servers $data
set ::active_clients_task($fd) "(SPAWNED SERVER) pid:$data"

} elseif {$status eq {server-killing}} {
set ::active_clients_task($fd) "(KILLING SERVER) pid:$data"

} elseif {$status eq {server-killed}} {
set ::active_servers [lsearch -all -inline -not -exact $::active_servers $data]
set ::active_clients_task($fd) "(KILLED SERVER) pid:$data"

} elseif {$status eq {run_solo}} {
lappend ::run_solo_tests $data

} else {
if {$::verbose} { puts "\[$status\]: $data" }
}
}

proc signal_idle_client fd {
# Remove this fd from the list of active clients.
set ::active_clients \
[lsearch -all -inline -not -exact $::active_clients $fd]

# New unit to process?
if {$::next_test != [llength $::all_tests]} {
if {$::verbose} {
puts "Testing \[[lindex $::all_tests $::next_test]\] and ASSIGNED to client : $fd"
set ::active_clients_task($fd) "ASSIGNED: $fd ([lindex $::all_tests $::next_test])"
}

set ::clients_start_time($fd) [clock seconds]
send_data_packet $fd run [lindex $::all_tests $::next_test]
lappend ::active_clients $fd
incr ::next_test
} elseif {[llength $::run_solo_tests] != 0 && [llength $::active_clients] == 0} {
if {$::verbose} {
puts "Testing solo test and ASSIGNED to client : $fd"
set ::active_clients_task($fd) "ASSIGNED: $fd solo test"
}

set ::clients_start_time($fd) [clock seconds]
send_data_packet $fd run_code [lpop ::run_solo_tests]
lappend ::active_clients $fd
} else {
lappend ::idle_clients $fd
set ::active_clients_task($fd) "SLEEPING, no more units to assign"
if {[llength $::active_clients] == 0} {
linespacer "#"
the_end
}
}
}

proc kill_clients {} {
foreach p $::clients_pids {
catch {exec kill $p}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
start_server {tags {"example"}} {
test {SET and GET example item} {
r set x example
r get x
} {example}
}
Loading

0 comments on commit fa0b8ae

Please sign in to comment.