Skip to content

Commit

Permalink
fix(rpc): pass all props of original message to retry message
Browse files Browse the repository at this point in the history
  • Loading branch information
CheerlessCloud committed Feb 22, 2019
1 parent 95f2b3f commit 7ef8884
Showing 1 changed file with 33 additions and 9 deletions.
42 changes: 33 additions & 9 deletions src/AMQPMessageRpcController.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// @flow
import type { PublishOptions } from 'amqplib';
import AMQPMessageController from './AMQPMessageController';
import AMQPMessage from './AMQPMessage';
import RpcService from './Service';
Expand Down Expand Up @@ -30,24 +31,47 @@ class AMQPMessageRpcController extends AMQPMessageController {
}

async resendAsRetry() {
const { messageId, correlationId, replyTo } = this._message.props;

const retryLimit = this._message.applicationLevelRetryLimit;

if (retryLimit === null) {
throw new Error('Retry disabled');
}

const adapter = this._service._getAdapter();
await adapter.send(this._message.sourceQueue, this._message.payload, {
messageId,
correlationId,
replyTo,

await adapter.send(
this._message.sourceQueue,
this._message.payload,
this._getPublishOptionsForRetry(),
);
}

_getPublishOptionsForRetry() {
const { props, applicationLevelRetryLimit } = this._message;

// @todo decremnt expiration
// @todo pass routing key
const mapped: PublishOptions = {
expiration: props.expiration,
correlationId: props.correlationId,
replyTo: props.replyTo,
exchange: props.exchange,
userId: props.userId,
priority: props.priority,
persistent: props.persistent,
contentType: props.contentType,
contentEncoding: props.contentEncoding,
timestamp: props.timestamp,
type: props.type,
appId: props.appId,
messageId: props.messageId,
headers: {
...this._message.props.headers,
'X-Retry-Limit': retryLimit,
...props.headers,
'X-Retry-Limit': applicationLevelRetryLimit,
},
});
};

return mapped;
}
}

Expand Down

0 comments on commit 7ef8884

Please sign in to comment.