Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEW] Light-weight cluster bus pubsub message #557

Closed
zuiderkwast opened this issue May 26, 2024 · 5 comments · Fixed by #654
Closed

[NEW] Light-weight cluster bus pubsub message #557

zuiderkwast opened this issue May 26, 2024 · 5 comments · Fixed by #654
Assignees
Labels
cluster major-decision-approved Major decision approved by TSC team

Comments

@zuiderkwast
Copy link
Contributor

zuiderkwast commented May 26, 2024

The problem/use-case that the feature addresses

In cluster mode, propagating a pubsub message between cluster nodes over the cluster bus has a large overhead, since every cluster bus message includes a lot of node information, such as a slot bitmap (2KB).

Description of the feature

Introduce a new cluster bus message type that shares the first elements of clusterMsg, so clusterReadHandler() and clusterIsValidPacket() can still be called on it, before branching by type. These are called on all incoming cluster bus messages. (port is not checked here, but it's squeezed in before type.)

    char sig[4];                  /* Signature "RCmb" (Cluster message bus). */
    uint32_t totlen;              /* Total length of this message */
    uint16_t ver;                 /* Protocol version, currently set to CLUSTER_PROTO_VER. */
    uint16_t port;                /* Primary port number (TCP or TLS). */
    uint16_t type;                /* Message type */

In clusterProcessPacket(), just after calling clusterIsValidPacket(), we add a check for type and branch out for the new lightweight pubsub message, if the type field indicates our new message type. We should update the sender->data_received timestamp though.

I hope we can avoid including the sender node ID too (40B). If it's the first message after reconnecting, the receiving node needs to validate that the message is from a known node in the cluster, but if we can require that it's never the first message after (re)connecting, we can skip the node id too.

Edit: It appears we can rely on PING or MEET always being sent after connect. I found this in clusterLinkConnectHandler:

    /* Queue a PING in the new connection ASAP: this is crucial
     * to avoid false positives in failure detection.
     *
     * If the node is flagged as MEET, we send a MEET message instead
     * of a PING one, to force the receiver to add us in its node
     * table. */

The payload should allow multiple messages for the same channel without repeating the channel name (for #525 but possibly also for batching in other ways). For messages in multiple channels, it may be OK to send a separate cluster bus message for each channel since the overhead is low (can be discussed).

Feature check: We should use a bit in clusterMsg, e.g. a cluster node flag, to indicate support for lightweight pubsub messages. A node only sends it to another node which is known to support it.

Alternatives you've considered

Additional information

This doesn't exclude the possibility of also doing #307 for sharded pubsub. The benefit of using the replication stream is to prevent the cluster bus from being overloaded by pubsub traffic.

@zuiderkwast zuiderkwast moved this to Researching in Roadmap May 26, 2024
@PingXie
Copy link
Member

PingXie commented May 26, 2024

I agree with the overall direction of this proposal. I think the lighter clusterMsg can be used for scenarios beyond sharded pubsub, such as shard-id and forgotten nodes broadcasting as well.

@zuiderkwast
Copy link
Contributor Author

We discussed this in the last core team meeting and agreed on the details.

@madolson madolson removed this from Roadmap Jun 6, 2024
@roshkhatri
Copy link
Member

roshkhatri commented Jun 11, 2024

Hi, I am currently implementing a new header struct whici looks something like this,

typedef struct {
    char sig[4];                  /* Signature "RCmb" (Cluster message bus). */
    uint32_t totlen;              /* Total length of this message */
    uint16_t ver;                 /* Protocol version, currently set to CLUSTER_PROTO_VER. */
    uint16_t port;                /* Primary port number (TCP or TLS). */
    uint16_t type;                /* Message type */
    char sender[CLUSTER_NAMELEN]; /* Name of the sender node */
    union clusterMsgData data;
} clusterMsgPubsub;

Trying to figure the changes in clusterReadHandler() to read from conn as clusterMsgPubsub if possible.
Will open a draft PR for this later.

@zuiderkwast
Copy link
Contributor Author

I don't think clusterReadHandler() needs any changes at all. It only accesses hdr->sig and hdr->tolen so as long as those are at the same place as in a clusterMsg, it's fine that the read handler thinks that it is a clusterMsg.

Same with clusterIsValidPacket(). It only accesses totlen, type and ver, so no changes needed.

AFAICT, the only place we need to hook in our check for type is in clusterProcessPacket(), just after clusterIsValidPacket() and before any fields that don't exist, like hrd->flags are accessed. When we know the type, we can cast it to our new struct:

int clusterProcessPacket(clusterLink *link) {
    /* Validate that the packet is well-formed */
    if (!clusterIsValidPacket(link)) return 1;

    clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
    uint16_t type = ntohs(hdr->type);
    if (type == CLUSTER_PUBSUB_LIGHT) {
        clusterPubsubLight *light_hdr = (clusterPubsubLight *)hdr;
        return clusterProcessPubsubLight(...);
    }

    ...
}

Btw, we don't need to include sender in the message either, so we can save some more bytes. This is never the first message on any connection, so the sender is already stored in the link object as sender = link->node;. We can use a modified version of getNodeFromLinkAndMsg() that only takes it from link and not from msg. (We don't even need a function, so view this a pseudo code.)

/* Use only when you know this is not the first message on the link. */
static clusterNode *getNodeFromLink(clusterLink *link) {
    assert(link->node && !nodeInHandshake(link->node));
    return link->node;
}

@roshkhatri
Copy link
Member

roshkhatri commented Jun 12, 2024

Thats makes sense! Thanks you so much!

Also I think we will have to make changes in the clusterReadHandler() as it does the following check

if (memcmp(hdr->sig, "RCmb", 4) != 0 || ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) {

Where our new min totlen will be something like CLUSTERMSGPUBSUB_MIN_LEN where we will check for publish type messages

@madolson madolson moved this to Todo in Valkey 8.0 Jun 17, 2024
@madolson madolson moved this from Todo to In Progress in Valkey 8.0 Jun 17, 2024
@madolson madolson removed this from Valkey 8.0 Jul 15, 2024
PingXie pushed a commit that referenced this issue Jul 26, 2024
Adds light-weight cluster bus header for pubsub message. Closes #557.

This also supports sending to and receiving non-light messages from
older versions of the engine.

The light-weight cluster bus message supports multiple pubsub messages
(payloads) for one pubsub channel. Receiving messages with multiple
payloads is supported but we're not yet sending such multi-payload
messages to other nodes.

---------

Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cluster major-decision-approved Major decision approved by TSC team
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants