Skip to content

Commit

Permalink
allow fastrouter subscriptions to use hostnames
Browse files Browse the repository at this point in the history
  • Loading branch information
terencehonles committed Sep 7, 2023
1 parent aba6903 commit d346e1e
Showing 1 changed file with 57 additions and 21 deletions.
78 changes: 57 additions & 21 deletions core/subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,12 @@ static void send_subscription(int sfd, char *host, char *message, uint16_t messa
memset(&udp_addr, 0, sizeof(struct sockaddr_in));
udp_addr.sin_family = AF_INET;
udp_addr.sin_port = htons(atoi(udp_port + 1));
udp_addr.sin_addr.s_addr = inet_addr(host);
char *resolved = uwsgi_resolve_ip(host);
if (!resolved) {
uwsgi_error("send_subscription()/socket()");
return;
}
udp_addr.sin_addr.s_addr = inet_addr(resolved);
ret = sendto(fd, message, message_size, 0, (struct sockaddr *) &udp_addr, sizeof(udp_addr));
udp_port[0] = ':';
}
Expand All @@ -669,15 +674,15 @@ static void send_subscription(int sfd, char *host, char *message, uint16_t messa
close(fd);
}

static struct uwsgi_buffer *uwsgi_subscription_ub(char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
static struct uwsgi_buffer *uwsgi_subscription_ub(char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *address, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
struct uwsgi_buffer *ub = uwsgi_buffer_new(4096);

// make space for uwsgi header
ub->pos = 4;

if (uwsgi_buffer_append_keyval(ub, "key", 3, key, keysize))
goto end;
if (uwsgi_buffer_append_keyval(ub, "address", 7, socket_name, strlen(socket_name)))
if (uwsgi_buffer_append_keyval(ub, "address", 7, address, strlen(address)))
goto end;

if (uwsgi.subscribe_with_modifier1) {
Expand Down Expand Up @@ -752,16 +757,16 @@ static struct uwsgi_buffer *uwsgi_subscription_ub(char *key, size_t keysize, uin
return NULL;
}

void uwsgi_send_subscription_from_fd(int fd, char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
void uwsgi_send_subscription_from_fd(int fd, char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *address, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {

if (socket_name == NULL && !uwsgi.sockets)
if (address == NULL && !uwsgi.sockets)
return;

if (!socket_name) {
socket_name = uwsgi.sockets->name;
if (!address) {
address = uwsgi.sockets->name;
}

struct uwsgi_buffer *ub = uwsgi_subscription_ub(key, keysize, modifier1, modifier2, cmd, socket_name, sign, sni_key, sni_crt, sni_ca);
struct uwsgi_buffer *ub = uwsgi_subscription_ub(key, keysize, modifier1, modifier2, cmd, address, sign, sni_key, sni_crt, sni_ca);

if (!ub)
return;
Expand All @@ -771,8 +776,8 @@ void uwsgi_send_subscription_from_fd(int fd, char *udp_address, char *key, size_
}


void uwsgi_send_subscription(char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
uwsgi_send_subscription_from_fd(-1, udp_address, key, keysize, modifier1, modifier2, cmd, socket_name, sign, sni_key, sni_crt, sni_ca);
void uwsgi_send_subscription(char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *address, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
uwsgi_send_subscription_from_fd(-1, udp_address, key, keysize, modifier1, modifier2, cmd, address, sign, sni_key, sni_crt, sni_ca);
}

#ifdef UWSGI_SSL
Expand Down Expand Up @@ -880,27 +885,43 @@ void uwsgi_subscribe(char *subscription, uint8_t cmd) {
int keysize = 0;
char *modifier1 = NULL;
int modifier1_len = 0;
char *socket_name = NULL;
char *address = NULL;
char *address_resolved = NULL;
char *udp_address = subscription;
char *udp_port = NULL;
char *subscription_key = NULL;
char *sign = NULL;

// check for explicit socket_name
// Check for an explicit address. This may be a reference to a shared
// socket i.e. "=0=REST", a unix socket "/path/to/socket.sock=REST",
// or a udp socket "server-address:port=REST"
char *equal = strchr(subscription, '=');
if (equal) {
socket_name = subscription;
if (socket_name[0] == '=') {
equal = strchr(socket_name + 1, '=');
address = subscription;
if (address[0] == '=') {
equal = strchr(address + 1, '=');
if (!equal)
return;
*equal = '\0';
struct uwsgi_socket *us = uwsgi_get_shared_socket_by_num(atoi(socket_name + 1));
struct uwsgi_socket *us = uwsgi_get_shared_socket_by_num(atoi(address + 1));
if (!us)
return;
socket_name = us->name;
address = us->name;
}
*equal = '\0';

char *address_port = strchr(address, ':');
if (address_port) {
*address_port = '\0';
char *resolved = uwsgi_resolve_ip(address);
*address_port = ':';

if (!resolved)
return;

address_resolved = uwsgi_concat2n(resolved, strlen(resolved), address_port, equal - address_port);
}

udp_address = equal + 1;
}

Expand Down Expand Up @@ -943,7 +964,7 @@ void uwsgi_subscribe(char *subscription, uint8_t cmd) {
modifier1_len = strlen(modifier1);
keysize = strlen(key);
}
uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, address_resolved ? address_resolved : address, sign, NULL, NULL, NULL);
modifier1 = NULL;
modifier1_len = 0;
}
Expand All @@ -961,7 +982,7 @@ void uwsgi_subscribe(char *subscription, uint8_t cmd) {
modifier1_len = strlen(modifier1);
keysize = strlen(key);
}
uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, address_resolved ? address_resolved : address, sign, NULL, NULL, NULL);
modifier1 = NULL;
modifier1_len = 0;
lines[i] = '\n';
Expand Down Expand Up @@ -990,7 +1011,7 @@ void uwsgi_subscribe(char *subscription, uint8_t cmd) {
modifier1_len = strlen(modifier1);
}

uwsgi_send_subscription(udp_address, subscription_key + 1, strlen(subscription_key + 1), uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
uwsgi_send_subscription(udp_address, subscription_key + 1, strlen(subscription_key + 1), uwsgi_str_num(modifier1, modifier1_len), 0, cmd, address_resolved ? address_resolved : address, sign, NULL, NULL, NULL);
if (modifier1)
modifier1[-1] = ',';
if (sign)
Expand All @@ -1001,6 +1022,7 @@ void uwsgi_subscribe(char *subscription, uint8_t cmd) {
if (equal)
*equal = '=';
free(udp_address);
free(address_resolved);

}

Expand All @@ -1010,6 +1032,7 @@ void uwsgi_subscribe2(char *arg, uint8_t cmd) {
char *s2_key = NULL;
char *s2_socket = NULL;
char *s2_addr = NULL;
char *s2_addr_resolved = NULL;
char *s2_weight = NULL;
char *s2_sign = NULL;
char *s2_modifier1 = NULL;
Expand Down Expand Up @@ -1056,7 +1079,18 @@ void uwsgi_subscribe2(char *arg, uint8_t cmd) {
modifier2 = atoi(s2_modifier2);
}

uwsgi_send_subscription(s2_server, s2_key, strlen(s2_key), modifier1, modifier2, cmd, s2_addr, s2_sign, s2_sni_key, s2_sni_crt, s2_sni_ca);
if (s2_addr) {
char *port = strchr(s2_addr, ':');
if (port) {
*port = '\0';
char *resolved = uwsgi_resolve_ip(s2_addr);
if (!resolved) goto end;
*port = ':';
s2_addr_resolved = uwsgi_concat2n(resolved, strlen(resolved), port, strlen(port));
}
}

uwsgi_send_subscription(s2_server, s2_key, strlen(s2_key), modifier1, modifier2, cmd, s2_addr_resolved ? s2_addr_resolved : s2_addr, s2_sign, s2_sni_key, s2_sni_crt, s2_sni_ca);
end:
if (s2_server)
free(s2_server);
Expand All @@ -1066,6 +1100,8 @@ void uwsgi_subscribe2(char *arg, uint8_t cmd) {
free(s2_socket);
if (s2_addr)
free(s2_addr);
if (s2_addr_resolved)
free(s2_addr_resolved);
if (s2_weight)
free(s2_weight);
if (s2_modifier1)
Expand Down

0 comments on commit d346e1e

Please sign in to comment.