Skip to content

Commit

Permalink
* FIX [exchange] Check pair0_sock before send msg.
Browse files Browse the repository at this point in the history
Signed-off-by: wangha <wanghamax@gmail.com>
  • Loading branch information
wanghaEMQ committed Jan 24, 2025
1 parent d102415 commit 8ba53f2
Showing 1 changed file with 7 additions and 1 deletion.
8 changes: 7 additions & 1 deletion src/mqtt/protocol/exchange/exchange_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -464,13 +464,18 @@ static void query_send_async(exchange_sock_t *s, struct cmd_data *cmd_data)
if (parquet_datas[i] != NULL) {
struct stream_decoded_data *parquet_decoded_data = NULL;

log_info("decode start");
parquet_decoded_data = stream_decode(s->ex_node->ex->streamType, parquet_datas[i]);
log_info("decode end");
if (parquet_decoded_data != NULL && parquet_decoded_data->len > 0) {
log_info("parquet_decoded_data[%d] size: %d", i, parquet_decoded_data->len);
nng_msg *newmsg = NULL;
nng_msg_alloc(&newmsg, 0);
nni_msg_append(newmsg, parquet_decoded_data->data, parquet_decoded_data->len);
nng_sendmsg(*(s->pair0_sock), newmsg, 0);
if (s->pair0_sock)
nng_sendmsg(*(s->pair0_sock), newmsg, 0);
else
log_error("pair0_sock is null!!!!!!!!!");
/* NOTE: sleep 1000ms */
nng_msleep(1000);
stream_decoded_data_free(parquet_decoded_data);
Expand Down Expand Up @@ -630,6 +635,7 @@ exchange_sock_init(void *arg, nni_sock *sock)
nni_mtx_init(&s->mtx);
nni_id_map_init(&s->rbmsgmap, 0, 0, true);
s->isBusy = false;
s->pair0_sock = NULL;

nni_lmq_init(&s->lmq, 256);

Expand Down

0 comments on commit 8ba53f2

Please sign in to comment.