diff --git a/src/supplemental/mqtt/mqtt_msg.c b/src/supplemental/mqtt/mqtt_msg.c index a1dccb435..5141fb53b 100644 --- a/src/supplemental/mqtt/mqtt_msg.c +++ b/src/supplemental/mqtt/mqtt_msg.c @@ -1024,7 +1024,7 @@ mqtt_close_unack_aio_cb(void *key, void *val) nni_aio * aio = val; if (aio) { - nni_aio_finish_error(aio, NNG_ECLOSED); + nni_aio_finish_sync(aio, 0, NNG_ECLOSED); nni_msg_free(nni_aio_get_msg(aio)); nni_aio_set_msg(aio, NULL); nni_aio_set_prov_data(aio, NULL); diff --git a/src/supplemental/mqtt/mqtt_public.c b/src/supplemental/mqtt/mqtt_public.c index 99519429f..202a5cb64 100644 --- a/src/supplemental/mqtt/mqtt_public.c +++ b/src/supplemental/mqtt/mqtt_public.c @@ -841,11 +841,14 @@ nng_mqtt_client_send_cb(void* arg) { nng_mqtt_client *client = (nng_mqtt_client *) arg; nng_aio * aio = client->send_aio; - nng_msg * msg = nng_aio_get_msg(aio); nng_msg * tmsg = NULL; nni_lmq * lmq = (nni_lmq *)client->msgq; - + // in case of data conention while fini pipes + if (nng_aio_result(aio) == NNG_ECLOSED) + return; + + nng_msg * msg = nng_aio_get_msg(aio); if (msg == NULL || nng_aio_result(aio) != 0) { client->cb(client, NULL, client->obj); return;