Skip to content

Commit

Permalink
Fix/relay slow path (#158)
Browse files Browse the repository at this point in the history
* copied perf test from sendmmsg branch

* trailing whitespace

* fix some warnings, add warnings to build

* cleanup warnings

* relay slow path fix

* forgot a file..

* clean up

* add file
  • Loading branch information
jthomas43 committed Oct 11, 2023
1 parent 3b197ec commit cd1e4e3
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 11 deletions.
17 changes: 11 additions & 6 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,16 +493,18 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_
// returns 1 on success, zero if packet can't be promoted to a probe packet
static int
mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) {
assert(pkt->bufs_len == 2);
assert(pkt->header[3] == 0);
assert(wanted_size > pkt->size);

if (pkt->bufs_len != 2 || pkt->header[3] != 0) {
return 0;
}
int header_size = (pkt->dest.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE) - 20;
int padding_size = wanted_size - (pkt->size + (pkt->dest.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE) - 20);
if (padding_size > 255) {
return 0;
}
debug_printf("mtu: probeify seq=%d size=%u wanted=%d padding=%d\n", pkt->seq, pkt->size + header_size, wanted_size, padding_size);
static char probe_data[256] = "";
static char probe_data[256] = {0};
pkt->bufs[2] = pkt->bufs[1];
pkt->bufs[1].len = padding_size;
pkt->bufs[1].base = probe_data;
Expand Down Expand Up @@ -1107,7 +1109,7 @@ process_data_packet (udx_stream_t *stream, int type, uint32_t seq, char *data, s
}

static int
relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint32_t seq, uint32_t ack) {
relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint8_t data_offset, uint32_t seq, uint32_t ack) {
stream->seq = seq_max(stream->seq, seq);

udx_stream_t *relay = stream->relay_to;
Expand All @@ -1124,7 +1126,9 @@ relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint32
b.base += UDX_HEADER_SIZE;
b.len -= UDX_HEADER_SIZE;

udx_packet_t *pkt = malloc(sizeof(udx_packet_t));
udx_packet_t *pkt = malloc(sizeof(udx_packet_t) + b.len);
memcpy((char *) pkt + sizeof(udx_packet_t), b.base, b.len);
b.base = (char *) pkt + sizeof(udx_packet_t);

init_stream_packet(pkt, type, relay, &b);

Expand All @@ -1134,6 +1138,7 @@ relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint32

pkt->status = UDX_PACKET_SENDING;
pkt->type = UDX_PACKET_STREAM_RELAY;
pkt->header[3] = data_offset;
pkt->seq = seq;

pkt->send_queue = &relay->socket->send_queue;
Expand Down Expand Up @@ -1181,7 +1186,7 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
if (stream->on_firewall(stream, socket, addr)) return 1;
}

if (stream->relay_to) return relay_packet(stream, buf, buf_len, type, seq, ack);
if (stream->relay_to) return relay_packet(stream, buf, buf_len, type, data_offset, seq, ack);

buf += UDX_HEADER_SIZE;
buf_len -= UDX_HEADER_SIZE;
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ list(APPEND tests
stream-write-read-ipv6
stream-write-read-perf
stream-change-remote
stream-multiple
)

list(APPEND skipped_tests
Expand Down
17 changes: 17 additions & 0 deletions test/helpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef TEST_HELPERS_H
#define TEST_HELPERS_H

#define HASH_INIT 5381

static inline uint64_t
hash (uint64_t prev, uint8_t *data, int len) {
uint64_t hash = prev;

for (int i = 0; i < len; i++) {
hash = ((hash << 5) + hash) + data[i];
}

return hash;
}

#endif // TEST_HELPERS_H
14 changes: 13 additions & 1 deletion test/stream-change-remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
#include <string.h>

#include "../include/udx.h"
#include "helpers.h"

#define NBYTES_TO_SEND 100000
#define NBYTES_TO_SEND 1000000

uv_loop_t loop;
udx_t udx;
Expand Down Expand Up @@ -34,6 +35,9 @@ int remote_changed_called = 0;

size_t nbytes_read;

size_t read_hash = HASH_INIT;
size_t write_hash = HASH_INIT;

void
on_ack (udx_stream_write_t *r, int status, int unordered) {
uv_stop(&loop);
Expand Down Expand Up @@ -64,6 +68,8 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {

nbytes_read += read_len;

read_hash = hash(read_hash, buf->base, read_len);

// swap to relay 1/3 of the way into the stream

if (nbytes_read > (NBYTES_TO_SEND / 3) && !changed) {
Expand Down Expand Up @@ -152,12 +158,18 @@ main () {
assert(e == 0);

uv_buf_t buf = uv_buf_init(malloc(NBYTES_TO_SEND), NBYTES_TO_SEND);

write_hash = hash(write_hash, buf.base, buf.len);

e = udx_stream_write(&req, &dstream, &buf, 1, on_ack);
assert(e && "drained");

uv_run(&loop, UV_RUN_DEFAULT);

assert(ack_called && read_called && remote_changed_called && nbytes_read == NBYTES_TO_SEND);

assert(read_hash == write_hash);
printf("read_hash=%lu write_hash=%lu\n", read_hash, write_hash);

return 0;
}
120 changes: 120 additions & 0 deletions test/stream-multiple.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include <assert.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>

#include "../include/udx.h"
#include "helpers.h"

#define NBYTES_TO_SEND 10000000
#define NSTREAMS 16

uv_loop_t loop;
udx_t udx;

struct sender {
struct sockaddr_in addr;
udx_socket_t usock;
udx_stream_t stream;
udx_stream_write_t write;

// size_t nbytes_written;
size_t write_hash;

bool ack;
} sender[NSTREAMS];

struct receiver {
struct sockaddr_in addr;
udx_socket_t usock;
udx_stream_t stream;

size_t nbytes_read;
size_t read_hash;
} receiver[NSTREAMS];

static bool
all_acked () {
for (int i = 0; i < NSTREAMS; i++) {
if (sender[i].ack == false) {
return false;
}
}
return true;
}

void
on_ack (udx_stream_write_t *r, int status, int unordered) {
struct sender *s = (struct sender *) ((char *) r - offsetof(struct sender, write));
s->ack = true;

if (all_acked()) {
uv_stop(&loop);
}
}

void
on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
struct receiver *r = (struct receiver *) ((char *) handle - offsetof(struct receiver, stream));

r->nbytes_read += read_len;
r->read_hash = hash(r->read_hash, buf->base, read_len);
}

int
main () {
int e;

uv_loop_init(&loop);

e = udx_init(&loop, &udx);
assert(e == 0);

uv_buf_t buf = uv_buf_init(malloc(NBYTES_TO_SEND), NBYTES_TO_SEND);

size_t write_hash = HASH_INIT;

write_hash = hash(write_hash, buf.base, buf.len);

for (int i = 0; i < NSTREAMS; i++) {
int sender_id = i;
int receiver_id = NSTREAMS + i;

receiver[i].read_hash = HASH_INIT;
e = udx_socket_init(&udx, &sender[i].usock);
assert(e == 0);
uv_ip4_addr("127.0.0.1", 8000 + i, &sender[i].addr);
e = udx_socket_bind(&sender[i].usock, (struct sockaddr *) &sender[i].addr, 0);
assert(e == 0);
e = udx_stream_init(&udx, &sender[i].stream, sender_id, NULL);

udx_socket_init(&udx, &receiver[i].usock);
uv_ip4_addr("127.0.0.1", 8100 + i, &receiver[i].addr);
e = udx_socket_bind(&receiver[i].usock, (struct sockaddr *) &receiver[i].addr, 0);
assert(e == 0);
e = udx_stream_init(&udx, &receiver[i].stream, receiver_id, NULL);
assert(e == 0);

e = udx_stream_read_start(&receiver[i].stream, on_read);
assert(e == 0);
sender[i].write_hash = write_hash;

e = udx_stream_connect(&sender[i].stream, &sender[i].usock, receiver_id, (struct sockaddr *) &receiver[i].addr);
assert(e == 0);

e = udx_stream_connect(&receiver[i].stream, &receiver[i].usock, sender_id, (struct sockaddr *) &sender[i].addr);
assert(e == 0);

udx_stream_write(&sender[i].write, &sender[i].stream, &buf, 1, on_ack);
}

uv_run(&loop, UV_RUN_DEFAULT);

for (int i = 0; i < NSTREAMS; i++) {
printf("%d: send_hash=%x receive_hash=%x sent_bytes=%lu recv_bytes=%lu\n", i, sender[i].write_hash, receiver[i].read_hash, NBYTES_TO_SEND, receiver[i].nbytes_read);
assert(sender[i].write_hash == receiver[i].read_hash);
assert(receiver[i].nbytes_read == NBYTES_TO_SEND);
}

return 0;
}
27 changes: 24 additions & 3 deletions test/stream-relay.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#include <assert.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>

#include "../include/udx.h"
#include "helpers.h"

#define NBYTES_TO_SEND 1000000

uv_loop_t loop;
udx_t udx;
Expand All @@ -28,6 +32,11 @@ udx_stream_write_t req;
bool ack_called = false;
bool read_called = false;

size_t write_hash = HASH_INIT;
size_t read_hash = HASH_INIT;

size_t nbytes_read;

void
on_ack (udx_stream_write_t *r, int status, int unordered) {
assert(&req == r);
Expand All @@ -41,10 +50,15 @@ on_ack (udx_stream_write_t *r, int status, int unordered) {

void
on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
assert(buf->len == 5);
assert(buf->len == read_len);
assert(memcmp(buf->base, "hello", 5) == 0);

if (nbytes_read == 0) {
printf("read_len=%d\n", read_len);
assert(memcmp(buf->base, "hello", 5) == 0);
}
read_hash = hash(read_hash, buf->base, read_len);

nbytes_read += read_len;
read_called = true;
}

Expand Down Expand Up @@ -118,12 +132,19 @@ main () {
e = udx_stream_connect(&dstream, &dsock, 3, (struct sockaddr *) &caddr);
assert(e == 0);

uv_buf_t buf = uv_buf_init("hello", 5);
uv_buf_t buf = uv_buf_init(calloc(NBYTES_TO_SEND, 1), NBYTES_TO_SEND);

memcpy(buf.base, "hello", 5);

write_hash = hash(write_hash, buf.base, buf.len);

udx_stream_write(&req, &dstream, &buf, 1, on_ack);

uv_run(&loop, UV_RUN_DEFAULT);

assert(ack_called && read_called);

assert(nbytes_read == NBYTES_TO_SEND && read_hash == write_hash);

return 0;
}
14 changes: 13 additions & 1 deletion test/stream-write-read-perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string.h>

#include "../include/udx.h"
#include "helpers.h"

uv_loop_t loop;
udx_t udx;
Expand Down Expand Up @@ -31,6 +32,9 @@ struct {
int finished;
} stats;

uint64_t read_hash = HASH_INIT;
uint64_t write_hash = HASH_INIT;

void
on_ack (udx_stream_write_t *r, int status, int unordered) {
printf("write acked, status=%d %s\n", status, status == UV_ECANCELED ? "(UV_ECANCELED)" : "");
Expand All @@ -43,6 +47,10 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
stats.bytes_read += read_len;
stats.last_read_ms = uv_hrtime() / 1000000;

assert(read_len == buf->len);

read_hash = hash(read_hash, buf->base, read_len);

if (stats.bytes_read == options.size_bytes) {
printf("read all bytes\n");
}
Expand Down Expand Up @@ -113,10 +121,12 @@ main () {

printf("generating data ...\n");

options.size_bytes = 2 * 1024 * 1024 * 1024L;
options.size_bytes = 50 * 1024 * 1024L;

char *data = calloc(options.size_bytes, 1);

write_hash = hash(write_hash, data, options.size_bytes);

assert(data != NULL && "malloc");

printf("writing data\n");
Expand All @@ -131,5 +141,7 @@ main () {

// just for valgrind
free(data);
printf("readhash=%x writehash=%x\n", read_hash, write_hash);
assert(read_hash == write_hash);
return 0;
}

0 comments on commit cd1e4e3

Please sign in to comment.