Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

handleMessage does not allow for an injected process method #403

Closed
ltyiz07 opened this issue May 24, 2023 · 3 comments · Fixed by #405
Closed

handleMessage does not allow for an injected process method #403

ltyiz07 opened this issue May 24, 2023 · 3 comments · Fixed by #405
Milestone

Comments

@ltyiz07
Copy link

ltyiz07 commented May 24, 2023

Describe the bug

Assign a call-back function at "handleMessage" at "Consumer.create(options: ConsumerOptions)"
with "ConsumerOptions.handleMessage" direct method assign is not working properly.

When I call injected services's method at inside of "process" method that I set as "handleMessage", it is not called properly.
Just pending everything without any error or messages.
But, If I call injected service's method at lambda function assigned to "handleMessge", it works properly.

Not working below code.

import { Message, SQSClient } from '@aws-sdk/client-sqs';
import { Inject, Injectable } from '@nestjs/common';
import { Consumer } from 'sqs-consumer';
import { Producer } from 'sqs-producer';
import { IJobConsumer } from './interfaces/job-queue.consumer.interface';
import { IJobQueueModuleOptions, JOB_QUEUE_OPTIONS } from './interfaces/job-queue.options.interface';
import { IJobQueueService } from './interfaces/job-queue.service.interface';

@Injectable()
export class JobQueueService implements IJobQueueService {

    private readonly sqsClient: SQSClient;
    private producer: Producer;
    private consumer: Consumer;

    constructor(
        @Inject(JOB_QUEUE_OPTIONS) private readonly options: IJobQueueModuleOptions,
    ) {

        this.sqsClient = new SQSClient({
            region: this.options.queueConnectionConfigs.sqsRegion,
            credentials: {
                accessKeyId: this.options.queueConnectionConfigs.sqsAccessKeyId,
                secretAccessKey: this.options.queueConnectionConfigs.sqsSecretAccessKey,
            },
        });

        this.producer = Producer.create({
            queueUrl: this.options.queueConnectionConfigs.url,
            sqs: this.sqsClient,
        });

    }

    async produce(messages: any): Promise<any> {
        console.log('send message: ', messages);
        const result = await this.producer.send(messages);
        return result;
    }

    setConsumer(consumer: IJobConsumer, options?: any): void {
        this.consumer = Consumer.create({
            queueUrl: this.options.queueConnectionConfigs.url,
            sqs: this.sqsClient,
            visibilityTimeout: 30,
            heartbeatInterval: 10,
            // handleMessage: async (msg: Message): Promise<Message | void>  => {
                // return await consumer.process(msg);
            // },
            handleMessage: consumer.process,
        })
        return this.consumer.start();
    }

    getJobQueueName(): string {
        return this.options.jobQueueName;
    }
}

but below code is working as expected

import { Message, SQSClient } from '@aws-sdk/client-sqs';
import { Inject, Injectable } from '@nestjs/common';
import { Consumer } from 'sqs-consumer';
import { Producer } from 'sqs-producer';
import { IJobConsumer } from './interfaces/job-queue.consumer.interface';
import { IJobQueueModuleOptions, JOB_QUEUE_OPTIONS } from './interfaces/job-queue.options.interface';
import { IJobQueueService } from './interfaces/job-queue.service.interface';

@Injectable()
export class JobQueueService implements IJobQueueService {

    private readonly sqsClient: SQSClient;
    private producer: Producer;
    private consumer: Consumer;

    constructor(
        @Inject(JOB_QUEUE_OPTIONS) private readonly options: IJobQueueModuleOptions,
    ) {

        this.sqsClient = new SQSClient({
            region: this.options.queueConnectionConfigs.sqsRegion,
            credentials: {
                accessKeyId: this.options.queueConnectionConfigs.sqsAccessKeyId,
                secretAccessKey: this.options.queueConnectionConfigs.sqsSecretAccessKey,
            },
        });

        this.producer = Producer.create({
            queueUrl: this.options.queueConnectionConfigs.url,
            sqs: this.sqsClient,
        });

    }

    async produce(messages: any): Promise<any> {
        console.log('send message: ', messages);
        const result = await this.producer.send(messages);
        return result;
    }

    setConsumer(consumer: IJobConsumer, options?: any): void {
        this.consumer = Consumer.create({
            queueUrl: this.options.queueConnectionConfigs.url,
            sqs: this.sqsClient,
            visibilityTimeout: 30,
            heartbeatInterval: 10,
            handleMessage: async (msg: Message): Promise<Message | void>  => {
                return await consumer.process(msg);
            },
            // handleMessage: consumer.process,
        })
        return this.consumer.start();
    }

    getJobQueueName(): string {
        return this.options.jobQueueName;
    }
}

Your minimal, reproducible example

import { Message, SQSClient } from '@aws-sdk/client-sqs'; import { Inject, Injectable } from '@nestjs/common'; import { Consumer } from 'sqs-consumer'; import { Producer } from 'sqs-producer'; import { IJobConsumer } from './interfaces/job-queue.consumer.interface'; import { IJobQueueModuleOptions, JOB_QUEUE_OPTIONS } from './interfaces/job-queue.options.interface'; import { IJobQueueService } from './interfaces/job-queue.service.interface'; @Injectable() export class JobQueueService implements IJobQueueService { private readonly sqsClient: SQSClient; private producer: Producer; private consumer: Consumer; constructor( @Inject(JOB_QUEUE_OPTIONS) private readonly options: IJobQueueModuleOptions, ) { this.sqsClient = new SQSClient({ region: this.options.queueConnectionConfigs.sqsRegion, credentials: { accessKeyId: this.options.queueConnectionConfigs.sqsAccessKeyId, secretAccessKey: this.options.queueConnectionConfigs.sqsSecretAccessKey, }, }); this.producer = Producer.create({ queueUrl: this.options.queueConnectionConfigs.url, sqs: this.sqsClient, }); } async produce(messages: any): Promise { console.log('send message: ', messages); const result = await this.producer.send(messages); return result; } setConsumer(consumer: IJobConsumer, options?: any): void { this.consumer = Consumer.create({ queueUrl: this.options.queueConnectionConfigs.url, sqs: this.sqsClient, visibilityTimeout: 30, heartbeatInterval: 10, handleMessage: async (msg: Message): Promise<Message | void> => { return await consumer.process(msg); }, // handleMessage: consumer.process, }) return this.consumer.start(); } getJobQueueName(): string { return this.options.jobQueueName; } }

Steps to reproduce

  1. Import job-queue module with its options
  2. Using job-queue modules method "setConsumer" set consumer class that implements IJobConsumer
  3. At Job-queue module's "setConsumer" method, set handleMessage with consumer's process method

Expected behavior

As a user, I expect injected service's method works properly even if I assign it to "handleMessage" directly.

How often does this bug happen?

Every time

Screenshots or Videos

No response

Platform

  • windows 10
  • node v18.14.2
  • @nestjs/sqs-producer: ^2.0.0
  • @aws-sdk/client-sqs: ^3.332.0

Package version

7.1.0

AWS SDK version

3.332.0

Additional context

No response

@nicholasgriffintn nicholasgriffintn added feature-request feature request and removed bug labels May 24, 2023
@nicholasgriffintn nicholasgriffintn changed the title [Bug]: handleMessage assign bug. handleMessage does not allow for an injected process method May 24, 2023
@nicholasgriffintn
Copy link
Member

nicholasgriffintn commented May 24, 2023

This definitely isn't a bug and is more a difference between your implementation and how we expect it to be implemented, the second example is the solution we have built for.

Will remove the bug label as this is not a bug. Happy to take a PR if you'd like this capability.

@nicholasgriffintn
Copy link
Member

This has been pre released to v7.2.0-canary.3, will be released to v7.2.0, once I've had a chance to validate all of the changes.

@github-actions
Copy link

github-actions bot commented Jul 9, 2023

This issue has been closed for more than 30 days. If this issue is still occuring, please open a new issue with more recent context.

@github-actions github-actions bot locked as resolved and limited conversation to collaborators Jul 9, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants