-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsqsmv.js
93 lines (77 loc) · 2.6 KB
/
sqsmv.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
const { SQSClient, ReceiveMessageCommand, DeleteMessageBatchCommand, SendMessageBatchCommand, GetQueueUrlCommand } = require('@aws-sdk/client-sqs');
const chunk = require('chunk');
require("array-foreach-async");
module.exports = class Sqsmv {
constructor() {
this.sqs = new SQSClient({});
this.source = null;
this.destination = null;
this.maxMessages = 10;
this.waitTimeSeconds = 0;
}
async setSource(source) {
this.source = await this.getQueueUrl(source);
}
async setDestination(destination) {
this.destination = await this.getQueueUrl(destination);
}
async getQueueUrl(name) {
if (name.startsWith('https://')) {
return name;
}
const data = await this.sqs.send(new GetQueueUrlCommand({
QueueName: name
}));
return data.QueueUrl;
}
async tick(parallel) {
let items = [];
for (let i = 0; i < parallel; i++) {
items.push(this.process());
}
return (await Promise.all(items))
.reduce((total, current) => total + current);
}
async process() {
const result = await this.sqs.send(new ReceiveMessageCommand({
QueueUrl: this.source,
MaxNumberOfMessages: this.maxMessages,
WaitTimeSeconds: this.waitTimeSeconds
}));
const messages = result.Messages;
if (!messages || !messages.length) {
return 0;
}
await this._sendBulk(messages.map((message) => {
return {
Id: message.MessageId,
MessageBody: message.Body,
MessageAttributes: message.MessageAttributes || {},
};
}));
await this.sqs.send(new DeleteMessageBatchCommand({
Entries: messages.map((message) => {
return {
Id: message.MessageId,
ReceiptHandle: message.ReceiptHandle,
}
}),
QueueUrl: this.source
}));
return messages.length;
}
async _sendBulk(messages) {
try {
await this.sqs.send(new SendMessageBatchCommand({
QueueUrl: this.destination,
Entries: messages
}))
} catch (e) {
if (e.message !== "BatchRequestTooLong" || messages.length <= 1 || e.code === "BatchRequestTooLong") {
throw e;
}
await chunk(messages, Math.floor(messages.length / 2))
.forEachAsync(async (items) => await this._sendBulk(items));
}
}
};