Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possibly not trappable error #692

Closed
iccicci opened this issue Jun 16, 2022 · 13 comments
Closed

Possibly not trappable error #692

iccicci opened this issue Jun 16, 2022 · 13 comments

Comments

@iccicci
Copy link

iccicci commented Jun 16, 2022

Hi all,

I'm working on a project using amqp v0.10.0, my implementation seems to work fine but...

My unit tests (using jest) unpredictably fail about 10% - 15% of times with following error:

    Channel ended, no reply will be forthcoming

      at rej (../../node_modules/amqplib/lib/channel.js:200:7)
      at Channel.Object.<anonymous>.C._rejectPending (../../node_modules/amqplib/lib/channel.js:202:28)
      at Channel.Object.<anonymous>.C.toClosed (../../node_modules/amqplib/lib/channel.js:170:8)
      at Channel.accept (../../node_modules/amqplib/lib/channel.js:189:12)
      at Connection.mainAccept (../../node_modules/amqplib/lib/connection.js:63:33)
      at Socket.go (../../node_modules/amqplib/lib/connection.js:486:48)

I found issue 250, ok, probably I'm doing something wrong in my strongly asynchronous flow, but...

I'm using Promises interface, I nested all the calls (connect, connection.createChannel, channel.assertQueue, channel.prefetch, channel.consume) inside try / catch blocks, I 'm listening for the error event both on connection and on channel, I'm logging all the call results, but I'm unable to trap the error.

At the end I was forced to implement a workaround inspired to this one and my unit tests now no longer fail, never!

Inspecting the logs produced by my unit tests, I would say that all the operations are performed in the correct order, but moreover I noticed that when the conditions are met to let the workaround to play its role, the test itself do not fails, while the next one neither starts and jest reports it as the failing one.

Since I'm trapping and logging everything more what I noticed from the inspection of my logs, I strongly suspect that the given error can't be trapped and amqp raises it in a way that it goes in unhandledException or unhandledRejection.

So, the core of this issue is: are we sure that the given error is thrown in a way it can be trapped the the amqp caller?

Thank you

@cressie176
Copy link
Collaborator

Hi @iccicci,

The way the amqp protocol works is that some commands have a reply while others do not. For example, declaring a queue or exchange has an equivalent declare-ok response from the broker. Other commands such as publishing a message using a regular, rather than a confirm channel do not have a response.

When you issue a command that has a reply, amqplib will temporarily stash a promise, to be resolved when the reply arrives. However, if a channel closes before the reply arrives, then the it will reject the pending promises with the "Channel ended, no reply will be forthcomming" message.

Is it possible you are getting this message because your tests are not waiting for all work to complete before tearing down the connection or channels?

Alternatively, is Jest running tests in parallel that could be interfering with one another?

One way to better understand what is happening is to use wireshark to watch the traffic between the application and the broker. It's most likely when the tests work, you'll see a matching set of commands + replies, but when the tests fail, you'll see which commands did not receive replies, and this may help you work out what code was still running just before the channel was closed.

As to amqplib trapping errors - this is possible if your connection / channel handlers throw errors synchronously. See here for a description of this particular problem

@cressie176
Copy link
Collaborator

The following demonstrates the problem you might be having

const amqplib = require('amqplib');

(async () => {
  const connection = await amqplib.connect();
  connection.on('error', (error) => {
    console.log({ error });
  })

  const channel = await connection.createConfirmChannel();
  channel.on('error', (error) => {
    console.log({ error });
  })

  // Simulate a parallel test closing the channel or connection...
  setImmediate(async () => {
    await channel.close();
  });

  // ...while another test is sending a command that expects a reply
  await channel.assertQueue('q-692', { autoDelete: true })

})();

@cressie176
Copy link
Collaborator

Hi @iccicci,

I'm going to close this as there's nothing to suggest this is a problem with amqplib, but please update if you still need help

@Bezlepkin
Copy link

Same issue :( the catch block does not catch any errors if there is no connection initially.

The code just stops at this point:
const connection: amqp.Connection = await amqp.connect(app.url);

@cressie176
Copy link
Collaborator

Have you set a connection timeout?

@Bezlepkin
Copy link

@cressie176 no...

@cressie176
Copy link
Collaborator

Try this

const connection: amqp.Connection = await amqp.connect(app.url, { timeout: timeoutInMillis });

@Bezlepkin
Copy link

@cressie176

public async run(): Promise<void> {
    try {
      const connection: amqp.Connection = await amqp.connect(app.url, { timeout: 2000 });
  
      connection.on('error', () => {
        console.log('connection error'); // never works
        this._connection = false;
      });
  
      connection.on('close', () => {
        console.log('connection close'); // it works when I shut down the RabbitMQ server.
        this._connection = false;
      });
  
      const queue = app.queueName;
      const channel = await connection.createChannel();
  
      await channel.assertQueue(queue);
  
      this._connection = true;
  
      channel.consume(queue, async (msg: ConsumeMessage | null): Promise<void> => {
        if (!msg) {
          logger.warn('Consumer cancelled by server');
          return;
        }
  
        if (await this.handleQueueMessage(msg)) {
          channel.ack(msg);
        }
      });
    } catch(e) {
      console.log(e); // there's nothing in the catch either.
    }
  }

@cressie176
Copy link
Collaborator

Looks good at a glance. There are a few of things I would consider in addition...

  • Set an error handler on the channel
  • Closing the channel & connection (in that order) when you no longer need them

Next step to debug is wireshark. This will show you the traffic between the client and broker.

https://www.rabbitmq.com/amqp-wireshark.html

@Bezlepkin
Copy link

@cressie176

I have a problem that I can't get an error when connecting to a shutdown RabitMQ server...

@Bezlepkin
Copy link

@cressie176 I'm currently testing this in the NestsJS application.

The application does not run if the RabbitMQ server is disabled.

It's happening at this very point:
await amqp.connect(this._options.url, { timeout: 30000 });

@cressie176
Copy link
Collaborator

I've tried the following script with node v18.3.0 on macbook...

const amqp = require('amqplib');

(async () => {
  try {
    const connection = await amqp.connect('amqp://127.0.0.1', { timeout: 5000 });

    connection.on('error', () => {
      console.log('connection error'); // never works
    });

    connection.on('close', () => {
      console.log('connection close'); // it works when I shut down the RabbitMQ server.
    });

    const queue = 'q-692'
    const channel = await connection.createChannel();

    await channel.assertQueue(queue);

    await channel.consume(queue, async (msg) => {
      if (!msg) {
        logger.warn('Consumer cancelled by server');
        return;
      }

      channel.ack(msg);
    });

    console.log('OK')

  } catch(e) {
    console.log('Caught', e); // there's nothing in the catch either.
  }
})();

Terminal

steve@local amqplib-692 % docker pause rabbitmq
steve@local amqplib-692 % node index.js                 
Caught Error: connect ETIMEDOUT
    at Socket.<anonymous> (/Users/steve/Development/amqp-node/amqplib-692/node_modules/amqplib/lib/connect.js:178:20)
    at Object.onceWrapper (node:events:641:28)
    at Socket.emit (node:events:527:28)
    at Socket._onTimeout (node:net:523:8)
    at listOnTimeout (node:internal/timers:564:17)
    at process.processTimers (node:internal/timers:507:7)

steve@local amqplib-692 % docker stop rabbitmq
steve@local amqplib-692 % node index.js             
Caught Error: connect ECONNREFUSED 127.0.0.1:5672
    at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1229:16) {
  errno: -61,
  code: 'ECONNREFUSED',
  syscall: 'connect',
  address: '127.0.0.1',
  port: 5672
}

Is there anything significantly different about your environment? Can you try the above locally?

@Bezlepkin
Copy link

@cressie176 You're right, it works on the locale. Thank you so much! I'll look for the problem....

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants