Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potentially unhandled rejection [1] Error: Channel ended, no reply will be forthcoming #250

Closed
cboden opened this issue May 18, 2016 · 22 comments

Comments

@cboden
Copy link

cboden commented May 18, 2016

I'm receiving this error:

Potentially unhandled rejection [1] Error: Channel ended, no reply will be forthcoming
at rej (node_modules/amqplib/lib/channel.js:190:7)
at Channel.C._rejectPending (node_modules/amqplib/lib/channel.js:192:28)
at Channel.C.toClosed (node_modules/amqplib/lib/channel.js:160:8)
at Connection.C.closeChannels (node_modules/amqplib/lib/connection.js:392:18)
at Connection.C.toClosed (node_modules/amqplib/lib/connection.js:399:8)
at Connection.accept (node_modules/amqplib/lib/connection.js:378:12)
at Socket.go (node_modules/amqplib/lib/connection.js:476:48)
at emitNone (events.js:86:13)
at Socket.emit (events.js:185:7)
at emitReadable
(_stream_readable.js:438:10)

This occurs because of a race condition happening where the order of operation is:

  • channel.cancel(consumerTag)
  • connection.close()

The connection ends up closing before cancel can be called on the channel. The error triggers because this promise is rejected before the return statement happens 2 lines later.

I'm not sure of the correct action to take here...I attempted to wrap the call to _rpc in a nextTick but that just seemed to shift the error. So far I've had the most success with putting:

when.Promise.onPotentiallyUnhandledRejection = function() {};

At the top of channel_model.js -- again, I'm not sure if that's the correct solution to this problem.

@cboden
Copy link
Author

cboden commented May 24, 2016

In a different script I'm receiving the same error. I take the following steps:

  • open a connection
  • create a channel
  • publish a message
  • set a delay of 1500 seconds (have confirmed message is published to broker)
  • attempt to close the channel via channel.close()

This time there is no race condition, the error just happens when I attempt to close the channel.

@squaremo
Copy link
Collaborator

You can synchronise on the promise returned from channel.cancel, or if that's in its own thread of control, on the promise returned from channel.close (which will wait for any incomplete operations).

@christensson
Copy link

@cboden Did you find any solution? I'm seeing the same error in 0.5.1 when doing the following:

await channel.close();
await connection.close();
unhandled promise rejection { Error: Channel ended, no reply will be forthcoming
    at rej (/home/qmachri/dev/secor/src/lib/msgbus/node_modules/amqplib/lib/channel.js:190:7)
    at Channel.C._rejectPending (/home/qmachri/dev/secor/src/lib/msgbus/node_modules/amqplib/lib/channel.js:192:28)
    at Channel.C.toClosed (/home/qmachri/dev/secor/src/lib/msgbus/node_modules/amqplib/lib/channel.js:160:8)
    at Channel.accept (/home/qmachri/dev/secor/src/lib/msgbus/node_modules/amqplib/lib/channel.js:179:12)
    at Connection.mainAccept [as accept] (/home/qmachri/dev/secor/src/lib/msgbus/node_modules/amqplib/lib/connection.js:63:33)
    at Socket.go (/home/qmachri/dev/secor/src/lib/msgbus/node_modules/amqplib/lib/connection.js:476:48)
    at emitNone (events.js:86:13)
    at Socket.emit (events.js:185:7)
    at emitReadable_ (_stream_readable.js:432:10)
    at emitReadable (_stream_readable.js:426:7)
    at readableAddChunk (_stream_readable.js:187:13)
    at Socket.Readable.push (_stream_readable.js:134:10)
    at TCP.onread (net.js:551:20)

@ghost
Copy link

ghost commented Mar 17, 2017

Just in case someone is still struggling, I have the code below working fine:

async function sendMessage() {
  const conn = await amqp.connect(rabbitMQUrl);
  const channel = await conn.createChannel();
  await channel.assertExchange(exchangeName, exchangeType, {durable: true});
  await channel.publish(exchangeName, routingKey, new Buffer(message));
  await channel.close();
  await conn.close();
};
sendMessage();

@0Ams
Copy link

0Ams commented Nov 27, 2018

@waubau I used await for connection.close(), but I get the same error.

@Faulknerd
Copy link

Bumping. Same error as @OhEn

@recursivefunk
Copy link

I was facing this error as well. Turns out, I simply wasn't using await for my queue assertion

- channel.assertQueue(q, { durable: true, exclusive: false });
+ await channel.assertQueue(q, { durable: true, exclusive: false });

@BrOrlandi
Copy link

I was facing this error as well. Turns out, I simply wasn't using await for my queue assertion

- channel.assertQueue(q, { durable: true, exclusive: false });
+ await channel.assertQueue(q, { durable: true, exclusive: false });

The same worked for me.

@robross0606
Copy link

robross0606 commented Mar 8, 2021

We're seeing this issue even now, 4 years after initial report.

  console.error
    Error: Channel ended, no reply will be forthcoming
        at rej (C:\Code\project\node_modules\amqplib\lib\channel.js:201:7)
        at ConfirmChannel.Object.<anonymous>.C._rejectPending (C:\Code\project\node_modules\amqplib\lib\channel.js:203:28)
        at ConfirmChannel.Object.<anonymous>.C.toClosed (C:\Code\project\node_modules\amqplib\lib\channel.js:171:8)
        at ConfirmChannel.accept (C:\Code\project\node_modules\amqplib\lib\channel.js:190:12)
        at Connection.mainAccept [as accept] (C:\Code\project\node_modules\amqplib\lib\connection.js:64:33)
        at Socket.go (C:\Code\project\node_modules\amqplib\lib\connection.js:478:48)
        at Socket.emit (events.js:315:20)
        at emitReadable_ (internal/streams/readable.js:569:12)
        at processTicksAndRejections (internal/process/task_queues.js:79:21)

I've gone over our code numerous times to ensure everything is being await'ed.

@klkvsk
Copy link

klkvsk commented Jul 28, 2021

I used a dirty workaround to fix this:

function waitForPending(channel) {
    const intervalTime = 100;
    let timeout = 5000;
    return new Promise(resolve => {
        if (channel.pending.length === 0 && channel.reply === null) {
            resolve();
        } else {
            const interval = setInterval(() => {
                if (channel.pending.length === 0 && channel.reply === null) {
                    // everything is clean
                    clearInterval(interval);
                    resolve();
                }
                timeout -= intervalTime;
                if (timeout <= 0) {
                    // timed out, but you probably still want to close it, so resolve()
                    clearInterval(interval);
                    resolve();
                }
            }, intervalTime);
        }
    })
}

Also note that after channel.close() there is a ChannelCloseOk command being awaited in channel.reply, so you wait again before closing the connection:

await waitForPending(channel);
await channel.close();
await waitForPending(channel);
await connection.close();

This is not bulletproof, I guess, and using setInterval is dumb, but I have not found any events I can listen to monitor pending and reply.

@aislanmaia
Copy link

Seeing the same problem just because a channel.close() after publishing to a queue...

@adeisbright
Copy link

I had the same issue.
I delayed closing the connection after publishing a message by doing the following:
setTimeout(async () => {
await connect.close() //connect is my connection variable
} , 1500) ;

@cressie176
Copy link
Collaborator

Whenever I see similar "no reply will be forthcoming" errors it's always ends up being a concurrency issue with my own code. Typically what happens is that I am asynchronously acknowledging messages, then something closes the channel - either because of an error or through deliberate action on my part. Any attempts to use the channel after this happens by one of the other asynchronous pieces of work can result in this error. The solution is to be very careful about how you control the use of shared channels.

@bakasmarius
Copy link

I had the same issue. I delayed closing the connection after publishing a message by doing the following: setTimeout(async () => { await connect.close() //connect is my connection variable } , 1500) ;

Thank you very much - adding a timeout before closing a connection also helped me to solve this error on NestJS in Jest tests after I added RabbitMQ dependencies/functionality.

@mayes243
Copy link

mayes243 commented Mar 4, 2023

I had the same issue. Hope following code will help you.
Make sure to add "await" before "channel.assertQueue"

PUBLISHER

const amqp = require('amqplib');

async function publishMessage() {
    try {
        const connection = await amqp.connect('amqp://localhost');
        const channel = await connection.createChannel();

        const queueName = 'queue';
        const message = 'Hello, world!';

        await channel.assertQueue(queueName);
        channel.sendToQueue(queueName, Buffer.from(message));

        console.log('Message sent:', message);

        await channel.close();
        await connection.close();
    } catch (error) {
        console.error('Error occurred:', error);
    }
}

publishMessage();

SUBSCRIBER

const amqp = require('amqplib');

async function consumeMessage() {
    try {
        // Connect to RabbitMQ server
        const connection = await amqp.connect('amqp://localhost');
        const channel = await connection.createChannel();

        // Create queue and consume messages
        const queueName = 'queue';
        
        channel.assertQueue(queueName);
        channel.consume(queueName, (message) => {
            console.log('Message received:', message.content.toString());
        }, { noAck: true });

        console.log('Waiting for messages...');
    } catch (error) {
        console.error(error);
    }
}

consumeMessage();

@cressie176
Copy link
Collaborator

Adding an arbitrary timeout is fine if you're writing a throw away script, but I wouldn't want to take this approach for a proper application. A better solution is to ensure you

  1. await any promises / use callbacks correctly (as mentioned by @mayes243)
  2. gracefully manage the shutdown of your application so that
    2.1 You prevent any new work (e.g. by unsubscribing from queues / preventing new inbound HTTP requests)
    2.2 You wait until you have received any outstanding publisher acknowledgements
    2.3 You closing all open channels
    2.4 You closing the RabbitMQ connection

If you do this you should not receive the "Channel ended, no reply will be forthcoming" error

@power-f-GOD
Copy link

Kindly, does anyone have an idea how I might fix this error (and proceed with my work) in NestJS? I've been unable to do anything and it blocks me. I've tried all the examples above and to no avail.

Please, help.🤲🏼

Cc: @cressie176

@cressie176
Copy link
Collaborator

cressie176 commented Aug 9, 2023

I'm not familiar with NestJS / Nest's RabbitMQ integration. However, in general if you're getting a channel error, it's usually because you've tried to do something the broker didn't like, e.g. recreate a queue / exchange with different configuration. Another frequent cause is that you've tried to use a channel after it has been closed. Yet another is that you have sent a command to RabbitMQ and are awaiting a reply, but the channel closed before the reply was received. In the case of the latter, you will get the 'Channel ended, no reply will be forthcoming'. This is especially common when you are sharing a channel and one flow of execution does something which breaks the channel while another is waiting for a reply.

The first thing to do is to fix any incorrect channel usage, e.g.

  • Don't use queues / exchanges which don't exist
  • Don't recreate queues / exchanges with different configuration
  • Don't continue to use a channel after closing it
  • Don't throw errors from the consumer callback (they will trigger a channel error)

You will also find things easier to manage if you use a dedicated channel for each consumer. In my applications I go one step further and don't share channels at all, but there is a cost - opening and closing a channel takes a little time and maintaining a larger number of channels increases the number of file handles and memory. I mitigate this by using a channel pool, similar to a database connection pool, but this does add extra complexity and is can still be slower than sharing a channel.

The other thing you can do is make sure your code handles channel errors, by replacing them when they break. This won't prevent the original channel from erroring, but it will at least allow your application to recover. I'm not sure how easy any of this will be with NestJS though.

@power-f-GOD
Copy link

Hi, @cressie176 . Thanks for your swift response. I didn't realize you had responded because I switched to another project. Noticed a moment ago.

So, yeah, one of what you said is definitely correct. This particularly: "Yet another is that you have sent a command to RabbitMQ and are awaiting a reply, but the channel closed before the reply was received."

I'm new to RMQ and didn't know this.
I remember the last time this error occurred when I wanted to switch projects, I ran a killall node command in the CLI most probably during an exchange and I didn't realize until the following day when I wanted to re-run the app.

So, in the case of NestJS, I have actually commented out the code that connects the microservices and, of course, the app starts without the errors. But when I copy and use one of the examples mentioned previously, I still get the error and it doesn't work.

The example:

      ...
      async function setupExchangeWithPromises() {
        try {
          const connection = await amqp.connect(configService.get('RMQ_URI'));
          const channel = await connection.createChannel();

          const exchangeName = 'WS';
          const exchangeType = 'direct';
          const options = { durable: true };

          await channel.assertExchange(exchangeName, exchangeType, options);

          await channel.close();
          await connection.close();
        } catch (error: any) {
          console.error('Error:', error);
        }
      }

      await setupExchangeWithPromises();
      ...

I get a similar error (code):

Error: read ECONNRESET
at TCP.onStreamRead (node:internal/stream_base_commons:217:20) {
  errno: -54,
  code: 'ECONNRESET',
  syscall: 'read'
}

@cressie176
Copy link
Collaborator

Hi @power-f-GOD,

ECONNRESET means the broker closed the connection, you can find some common causes in the amqplib troubleshooting guide but it's typically a problem with your configuration rather than your code.

@power-f-GOD
Copy link

power-f-GOD commented Aug 10, 2023

ECONNRESET means the broker closed the connection...

Hmm... that's strange because it is the same error code I get with the NestJS implementation (without the "forthcoming" message though). Here:

Screenshot 2023-08-10 at 4 08 09 AM

And I double-checked my configuration, and actually, nothing changed on my side:

# RabbitMQ
RMQ_USERNAME=admin
RMQ_PASSWORD=admin
RMQ_PORT=5672
RMQ_URI=amqp://${RMQ_USERNAME}:${RMQ_PASSWORD}@${HOST}:${RMQ_PORT}?heartbeat=15

@power-f-GOD
Copy link

As all the suggestions above didn't work for me, I took the hard route/path: I (brew) uninstalled and reinstalled RabbitMQ (and the error ceased).

Ref: https://stackoverflow.com/a/69094131/14034888

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests