Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/relay slow path #158

Merged
merged 23 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
3693244
copied perf test from sendmmsg branch
jthomas43 Apr 19, 2023
7c6c578
trailing whitespace
jthomas43 Apr 21, 2023
5c07467
Merge branch 'master' of github.com:jthomas43/libudx
jthomas43 May 26, 2023
2df62eb
Merge branch 'holepunchto:main' into main
jthomas43 May 30, 2023
400de49
Merge branch 'holepunchto:main' into main
jthomas43 Aug 11, 2023
944cd60
Merge branch 'holepunchto:main' into main
jthomas43 Sep 12, 2023
10f535b
Merge branch 'holepunchto:main' into main
jthomas43 Sep 15, 2023
17a53fa
Merge branch 'holepunchto:main' into main
jthomas43 Sep 22, 2023
adcec59
Merge branch 'holepunchto:main' into main
jthomas43 Sep 26, 2023
df3e9a3
fix some warnings, add warnings to build
jthomas43 Sep 27, 2023
7e32418
Merge branch 'holepunchto:main' into main
jthomas43 Sep 27, 2023
4920342
Merge branch 'main' into feature/fix-warnings
jthomas43 Sep 27, 2023
4b9b267
cleanup warnings
jthomas43 Sep 28, 2023
a99f1c0
Merge branch 'holepunchto:main' into main
jthomas43 Oct 3, 2023
c21d187
Merge branch 'main' of github.com:jthomas43/libudx
jthomas43 Oct 3, 2023
ff35d82
Merge branch 'holepunchto:main' into main
jthomas43 Oct 4, 2023
7dd871c
Merge branch 'main' of github.com:jthomas43/libudx
jthomas43 Oct 4, 2023
912efdb
Merge branch 'holepunchto:main' into main
jthomas43 Oct 6, 2023
b5c6fdb
Merge branch 'main' of github.com:jthomas43/libudx
jthomas43 Oct 6, 2023
b767a10
relay slow path fix
jthomas43 Oct 6, 2023
ce9a8c3
forgot a file..
jthomas43 Oct 6, 2023
b93e646
clean up
jthomas43 Oct 6, 2023
f525e95
add file
jthomas43 Oct 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/udx.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,16 +493,18 @@ init_stream_packet (udx_packet_t *pkt, int type, udx_stream_t *stream, const uv_
// returns 1 on success, zero if packet can't be promoted to a probe packet
static int
mtu_probeify_packet (udx_packet_t *pkt, int wanted_size) {
assert(pkt->bufs_len == 2);
assert(pkt->header[3] == 0);
assert(wanted_size > pkt->size);

if (pkt->bufs_len != 2 || pkt->header[3] != 0) {
return 0;
}
int header_size = (pkt->dest.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE) - 20;
int padding_size = wanted_size - (pkt->size + (pkt->dest.ss_family == AF_INET ? UDX_IPV4_HEADER_SIZE : UDX_IPV6_HEADER_SIZE) - 20);
if (padding_size > 255) {
return 0;
}
debug_printf("mtu: probeify seq=%d size=%u wanted=%d padding=%d\n", pkt->seq, pkt->size + header_size, wanted_size, padding_size);
static char probe_data[256] = "";
static char probe_data[256] = {0};
pkt->bufs[2] = pkt->bufs[1];
pkt->bufs[1].len = padding_size;
pkt->bufs[1].base = probe_data;
Expand Down Expand Up @@ -1107,7 +1109,7 @@ process_data_packet (udx_stream_t *stream, int type, uint32_t seq, char *data, s
}

static int
relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint32_t seq, uint32_t ack) {
relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint8_t data_offset, uint32_t seq, uint32_t ack) {
stream->seq = seq_max(stream->seq, seq);

udx_stream_t *relay = stream->relay_to;
Expand All @@ -1124,7 +1126,9 @@ relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint32
b.base += UDX_HEADER_SIZE;
b.len -= UDX_HEADER_SIZE;

udx_packet_t *pkt = malloc(sizeof(udx_packet_t));
udx_packet_t *pkt = malloc(sizeof(udx_packet_t) + b.len);
memcpy((char *) pkt + sizeof(udx_packet_t), b.base, b.len);
b.base = (char *) pkt + sizeof(udx_packet_t);

init_stream_packet(pkt, type, relay, &b);

Expand All @@ -1134,6 +1138,7 @@ relay_packet (udx_stream_t *stream, char *buf, ssize_t buf_len, int type, uint32

pkt->status = UDX_PACKET_SENDING;
pkt->type = UDX_PACKET_STREAM_RELAY;
pkt->header[3] = data_offset;
pkt->seq = seq;

pkt->send_queue = &relay->socket->send_queue;
Expand Down Expand Up @@ -1181,7 +1186,7 @@ process_packet (udx_socket_t *socket, char *buf, ssize_t buf_len, struct sockadd
if (stream->on_firewall(stream, socket, addr)) return 1;
}

if (stream->relay_to) return relay_packet(stream, buf, buf_len, type, seq, ack);
if (stream->relay_to) return relay_packet(stream, buf, buf_len, type, data_offset, seq, ack);

buf += UDX_HEADER_SIZE;
buf_len -= UDX_HEADER_SIZE;
Expand Down
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ list(APPEND tests
stream-write-read-ipv6
stream-write-read-perf
stream-change-remote
stream-multiple
)

list(APPEND skipped_tests
Expand Down
17 changes: 17 additions & 0 deletions test/helpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef TEST_HELPERS_H
#define TEST_HELPERS_H

#define HASH_INIT 5381

static inline uint64_t
hash (uint64_t prev, uint8_t *data, int len) {
uint64_t hash = prev;

for (int i = 0; i < len; i++) {
hash = ((hash << 5) + hash) + data[i];
}

return hash;
}

#endif // TEST_HELPERS_H
14 changes: 13 additions & 1 deletion test/stream-change-remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
#include <string.h>

#include "../include/udx.h"
#include "helpers.h"

#define NBYTES_TO_SEND 100000
#define NBYTES_TO_SEND 1000000

uv_loop_t loop;
udx_t udx;
Expand Down Expand Up @@ -34,6 +35,9 @@ int remote_changed_called = 0;

size_t nbytes_read;

size_t read_hash = HASH_INIT;
size_t write_hash = HASH_INIT;

void
on_ack (udx_stream_write_t *r, int status, int unordered) {
uv_stop(&loop);
Expand Down Expand Up @@ -64,6 +68,8 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {

nbytes_read += read_len;

read_hash = hash(read_hash, buf->base, read_len);

// swap to relay 1/3 of the way into the stream

if (nbytes_read > (NBYTES_TO_SEND / 3) && !changed) {
Expand Down Expand Up @@ -152,12 +158,18 @@ main () {
assert(e == 0);

uv_buf_t buf = uv_buf_init(malloc(NBYTES_TO_SEND), NBYTES_TO_SEND);

write_hash = hash(write_hash, buf.base, buf.len);

e = udx_stream_write(&req, &dstream, &buf, 1, on_ack);
assert(e && "drained");

uv_run(&loop, UV_RUN_DEFAULT);

assert(ack_called && read_called && remote_changed_called && nbytes_read == NBYTES_TO_SEND);

assert(read_hash == write_hash);
printf("read_hash=%lu write_hash=%lu\n", read_hash, write_hash);

return 0;
}
120 changes: 120 additions & 0 deletions test/stream-multiple.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include <assert.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>

#include "../include/udx.h"
#include "helpers.h"

#define NBYTES_TO_SEND 10000000
#define NSTREAMS 16

uv_loop_t loop;
udx_t udx;

struct sender {
struct sockaddr_in addr;
udx_socket_t usock;
udx_stream_t stream;
udx_stream_write_t write;

// size_t nbytes_written;
size_t write_hash;

bool ack;
} sender[NSTREAMS];

struct receiver {
struct sockaddr_in addr;
udx_socket_t usock;
udx_stream_t stream;

size_t nbytes_read;
size_t read_hash;
} receiver[NSTREAMS];

static bool
all_acked () {
for (int i = 0; i < NSTREAMS; i++) {
if (sender[i].ack == false) {
return false;
}
}
return true;
}

void
on_ack (udx_stream_write_t *r, int status, int unordered) {
struct sender *s = (struct sender *) ((char *) r - offsetof(struct sender, write));
s->ack = true;

if (all_acked()) {
uv_stop(&loop);
}
}

void
on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
struct receiver *r = (struct receiver *) ((char *) handle - offsetof(struct receiver, stream));

r->nbytes_read += read_len;
r->read_hash = hash(r->read_hash, buf->base, read_len);
}

int
main () {
int e;

uv_loop_init(&loop);

e = udx_init(&loop, &udx);
assert(e == 0);

uv_buf_t buf = uv_buf_init(malloc(NBYTES_TO_SEND), NBYTES_TO_SEND);

size_t write_hash = HASH_INIT;

write_hash = hash(write_hash, buf.base, buf.len);

for (int i = 0; i < NSTREAMS; i++) {
int sender_id = i;
int receiver_id = NSTREAMS + i;

receiver[i].read_hash = HASH_INIT;
e = udx_socket_init(&udx, &sender[i].usock);
assert(e == 0);
uv_ip4_addr("127.0.0.1", 8000 + i, &sender[i].addr);
e = udx_socket_bind(&sender[i].usock, (struct sockaddr *) &sender[i].addr, 0);
assert(e == 0);
e = udx_stream_init(&udx, &sender[i].stream, sender_id, NULL);

udx_socket_init(&udx, &receiver[i].usock);
uv_ip4_addr("127.0.0.1", 8100 + i, &receiver[i].addr);
e = udx_socket_bind(&receiver[i].usock, (struct sockaddr *) &receiver[i].addr, 0);
assert(e == 0);
e = udx_stream_init(&udx, &receiver[i].stream, receiver_id, NULL);
assert(e == 0);

e = udx_stream_read_start(&receiver[i].stream, on_read);
assert(e == 0);
sender[i].write_hash = write_hash;

e = udx_stream_connect(&sender[i].stream, &sender[i].usock, receiver_id, (struct sockaddr *) &receiver[i].addr);
assert(e == 0);

e = udx_stream_connect(&receiver[i].stream, &receiver[i].usock, sender_id, (struct sockaddr *) &sender[i].addr);
assert(e == 0);

udx_stream_write(&sender[i].write, &sender[i].stream, &buf, 1, on_ack);
}

uv_run(&loop, UV_RUN_DEFAULT);

for (int i = 0; i < NSTREAMS; i++) {
printf("%d: send_hash=%x receive_hash=%x sent_bytes=%lu recv_bytes=%lu\n", i, sender[i].write_hash, receiver[i].read_hash, NBYTES_TO_SEND, receiver[i].nbytes_read);
assert(sender[i].write_hash == receiver[i].read_hash);
assert(receiver[i].nbytes_read == NBYTES_TO_SEND);
}

return 0;
}
27 changes: 24 additions & 3 deletions test/stream-relay.c
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#include <assert.h>
#include <stdbool.h>
#include <stdlib.h>
#include <string.h>

#include "../include/udx.h"
#include "helpers.h"

#define NBYTES_TO_SEND 1000000

uv_loop_t loop;
udx_t udx;
Expand All @@ -28,6 +32,11 @@ udx_stream_write_t req;
bool ack_called = false;
bool read_called = false;

size_t write_hash = HASH_INIT;
size_t read_hash = HASH_INIT;

size_t nbytes_read;

void
on_ack (udx_stream_write_t *r, int status, int unordered) {
assert(&req == r);
Expand All @@ -41,10 +50,15 @@ on_ack (udx_stream_write_t *r, int status, int unordered) {

void
on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
assert(buf->len == 5);
assert(buf->len == read_len);
assert(memcmp(buf->base, "hello", 5) == 0);

if (nbytes_read == 0) {
printf("read_len=%d\n", read_len);
assert(memcmp(buf->base, "hello", 5) == 0);
}
read_hash = hash(read_hash, buf->base, read_len);

nbytes_read += read_len;
read_called = true;
}

Expand Down Expand Up @@ -118,12 +132,19 @@ main () {
e = udx_stream_connect(&dstream, &dsock, 3, (struct sockaddr *) &caddr);
assert(e == 0);

uv_buf_t buf = uv_buf_init("hello", 5);
uv_buf_t buf = uv_buf_init(calloc(NBYTES_TO_SEND, 1), NBYTES_TO_SEND);

memcpy(buf.base, "hello", 5);

write_hash = hash(write_hash, buf.base, buf.len);

udx_stream_write(&req, &dstream, &buf, 1, on_ack);

uv_run(&loop, UV_RUN_DEFAULT);

assert(ack_called && read_called);

assert(nbytes_read == NBYTES_TO_SEND && read_hash == write_hash);

return 0;
}
14 changes: 13 additions & 1 deletion test/stream-write-read-perf.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <string.h>

#include "../include/udx.h"
#include "helpers.h"

uv_loop_t loop;
udx_t udx;
Expand Down Expand Up @@ -31,6 +32,9 @@ struct {
int finished;
} stats;

uint64_t read_hash = HASH_INIT;
uint64_t write_hash = HASH_INIT;

void
on_ack (udx_stream_write_t *r, int status, int unordered) {
printf("write acked, status=%d %s\n", status, status == UV_ECANCELED ? "(UV_ECANCELED)" : "");
Expand All @@ -43,6 +47,10 @@ on_read (udx_stream_t *handle, ssize_t read_len, const uv_buf_t *buf) {
stats.bytes_read += read_len;
stats.last_read_ms = uv_hrtime() / 1000000;

assert(read_len == buf->len);

read_hash = hash(read_hash, buf->base, read_len);

if (stats.bytes_read == options.size_bytes) {
printf("read all bytes\n");
}
Expand Down Expand Up @@ -113,10 +121,12 @@ main () {

printf("generating data ...\n");

options.size_bytes = 2 * 1024 * 1024 * 1024L;
options.size_bytes = 50 * 1024 * 1024L;

char *data = calloc(options.size_bytes, 1);

write_hash = hash(write_hash, data, options.size_bytes);

assert(data != NULL && "malloc");

printf("writing data\n");
Expand All @@ -131,5 +141,7 @@ main () {

// just for valgrind
free(data);
printf("readhash=%x writehash=%x\n", read_hash, write_hash);
assert(read_hash == write_hash);
return 0;
}
Loading