Skip to content

Commit

Permalink
feat: bulk issuance implementation using nest queue (#732)
Browse files Browse the repository at this point in the history
* feat: implemented bulk issuance with nest queue

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

* refactor: changed comment message

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>

---------

Signed-off-by: tipusinghaw <tipu.singh@ayanworks.com>
Signed-off-by: KulkarniShashank <shashank.kulkarni@ayanworks.com>
  • Loading branch information
tipusinghaw authored and KulkarniShashank committed Sep 11, 2024
1 parent 987d5e9 commit e28c784
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 23 deletions.
9 changes: 1 addition & 8 deletions apps/api-gateway/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import { UserModule } from './user/user.module';
import { ConnectionModule } from './connection/connection.module';
import { EcosystemModule } from './ecosystem/ecosystem.module';
import { getNatsOptions } from '@credebl/common/nats.config';
import { BullModule } from '@nestjs/bull';
import { CacheModule } from '@nestjs/cache-manager';
import * as redisStore from 'cache-manager-redis-store';
import { WebhookModule } from './webhook/webhook.module';
Expand Down Expand Up @@ -54,13 +53,7 @@ import { NotificationModule } from './notification/notification.module';
UtilitiesModule,
WebhookModule,
NotificationModule,
CacheModule.register({ store: redisStore, host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }),
BullModule.forRoot({
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT)
}
})
CacheModule.register({ store: redisStore, host: process.env.REDIS_HOST, port: process.env.REDIS_PORT })
],
controllers: [AppController],
providers: [AppService]
Expand Down
6 changes: 6 additions & 0 deletions apps/issuance/src/issuance.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ import { AwsService } from '@credebl/aws';
]),
CommonModule,
CacheModule.register({ store: redisStore, host: process.env.REDIS_HOST, port: process.env.REDIS_PORT }),
BullModule.forRoot({
redis: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT)
}
}),
BullModule.registerQueue({
name: 'bulk-issuance'
})
Expand Down
38 changes: 23 additions & 15 deletions apps/issuance/src/issuance.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1084,23 +1084,31 @@ async sendEmailForCredentialOffer(sendEmailCredentialOffer: SendEmailCredentialO
if (!respFile) {
throw new BadRequestException(ResponseMessages.issuance.error.fileData);
}
// ------------------------ Remove after debugging ---------------------------
const queueRunningStatus = await this.bulkIssuanceQueue.isReady();
this.logger.log(`respFile::::::`, respFile);
// eslint-disable-next-line no-console
console.log('queueRunningStatus:::::::::', queueRunningStatus);
// ------------------------ Remove after debugging ---------------------------

for (const element of respFile) {
try {
const payload = {
data: element.credential_data,
fileUploadId: element.fileUploadId,
clientId: clientDetails.clientId,
cacheId: requestId,
credentialDefinitionId: element.credDefId,
schemaLedgerId: element.schemaId,
isRetry: false,
orgId,
id: element.id,
isLastData: respFile.indexOf(element) === respFile.length - 1
};

await this.delay(500); // Wait for 0.5 secends
this.processIssuanceData(payload);
this.logger.log(`element log::::::`, element); //Remove after debugging
this.bulkIssuanceQueue.add(
{
data: element.credential_data,
fileUploadId: element.fileUploadId,
clientId: clientDetails.clientId,
cacheId: requestId,
credentialDefinitionId: element.credDefId,
schemaLedgerId: element.schemaId,
isRetry: false,
orgId,
id: element.id,
isLastData: respFile.indexOf(element) === respFile.length - 1
},
{ delay: 5000 }
);
} catch (error) {
this.logger.error(`Error processing issuance data: ${error}`);
}
Expand Down

0 comments on commit e28c784

Please sign in to comment.