Skip to content

Commit

Permalink
feat(rpc/service): integrate new handler class
Browse files Browse the repository at this point in the history
  • Loading branch information
CheerlessCloud committed Oct 2, 2018
1 parent 14c3af7 commit 587cbd5
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 87 deletions.
129 changes: 48 additions & 81 deletions src/Service.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import EError from 'eerror';
import type { IMessage } from './AMQPMessage';
import { type ConnectOptions, type QueueOptions } from './AMQPAdapter';
import AdapterConsumer from './AdapterConsumer';
import type { IRpcServiceHandler } from './IRpcServiceHandler';
import HandlerMap from './rpc/HandlerMap';
import type { IHandler } from './rpc/IHandler';

type RpcServiceQueueOptions = { ...$Exact<QueueOptions>, prefetch?: number };

Expand All @@ -15,8 +16,7 @@ type RpcServiceConstructorOptions = {
};

class RpcService extends AdapterConsumer {
// eslint-disable-next-line flowtype/generic-spacing
_handlers: Map<string, Class<IRpcServiceHandler>> = new Map();
_handlerMap: HandlerMap = new HandlerMap();
_service: string;
_version: string;
_subscribeState: 'uninitiated' | 'functionalHandler' | 'classHandler' = 'uninitiated';
Expand Down Expand Up @@ -67,7 +67,7 @@ class RpcService extends AdapterConsumer {
);
}

async _addSubscriber(handler: IMessage => Promise<any> | any) {
async _initSubscriber() {
const adapter = this._getAdapter();
await adapter.ensureQueue({ name: this.queueName, ...this._queueOptions });
const { prefetch = 1 } = this._queueOptions || {};
Expand All @@ -78,109 +78,76 @@ class RpcService extends AdapterConsumer {
{
noAck: false,
},
handler,
message => this._messageHandler(message),
);
}

async addHandler(handler: Class<IRpcServiceHandler>) {
if (this._subscribeState === 'functionalHandler') {
throw new Error('Functional handler already set');
}

const { action } = handler.prototype;

if (!action) {
throw new Error('Handler must implement IRpcServiceHandler interface');
}
async _onInit() {
await this._initSubscriber();
}

if (this._handlers.has(action)) {
throw new Error('Handler for this action already set');
}
async addHandler(handler: Class<IHandler>) {
this._handlerMap.add(handler);
}

this._handlers.set(action, handler);
async _getHandlerClassByMessage(message: IMessage): Promise<?Class<IHandler>> {
const { type: action } = message._props;
const Handler = this._handlerMap.get(action);

if (this._subscribeState === 'uninitiated') {
await this._addSubscriber(message => this._classMessageHandler(message));
this._subscribeState = 'classHandler';
if (Handler) {
return Handler;
}
}

// @todo merge _classMessageHandler and _functionalMessageHandler and extract to class
async _classMessageHandler(message: IMessage): Promise<void> {
const { type: action = 'default' } = message._props;
const RpcServiceHandler = this._handlers.get(action);
const mustRequeue = !message._props.redelivered;

if (!RpcServiceHandler) {
await message.reject(mustRequeue);
try {
// @todo: retry in other instance
const error = new EError('Handler for action not found').combine({
action,
messageId: message.id,
requeue: mustRequeue,
});
this._errorHandler(error);

if (!mustRequeue) {
await this._reply(message, null, error);
}

return;
await message.reject();
await this._reply(message, null, error);
} catch (error) {
this._errorHandler(error);
}
}

let handler = null;
async _constructHandler(Handler: Class<IHandler>, message: IMessage): Promise<?IHandler> {
try {
handler = new RpcServiceHandler({ service: this, message });
const handler: IHandler = new Handler({ service: this, message });
return handler;
} catch (err) {
await message.reject(mustRequeue);
const error = new EError('Error on construct class handler').combine({
action,
messageId: message.id,
requeue: mustRequeue,
});
this._errorHandler(error);
if (!mustRequeue) {
try {
// @todo: review and refactor behavior error on construct handler
const error = new EError('Error on construct class handler').combine({
action: Handler.prototype.action,
messageId: message.id,
originalError: err,
});
this._errorHandler(error);
await message.reject();
await this._reply(message, null, error);
} catch (error) {
this._errorHandler(error);
}
return;
}
}

let isSuccess = false;
try {
await handler.beforeHandle();

const replyPayload = await handler.handle();

await this._reply(message, replyPayload);
isSuccess = true;
async _messageHandler(message: IMessage): Promise<void> {
const Handler: ?Class<IHandler> = await this._getHandlerClassByMessage(message);

await handler.onSuccess();
} catch (err) {
await this._reply(message, null, err);
await handler.onFail(err);
} finally {
await handler.afterHandle();
if (!Handler) {
return;
}

if (message.isSealed) {
return; // eslint-disable-line no-unsafe-finally
}
const handler: ?IHandler = await this._constructHandler(Handler, message);

try {
if (isSuccess) {
await message.ack();
} else {
await message.reject();
}
} catch (err) {
this._errorHandler(
EError.wrap(err, {
action,
messageId: message.id,
isSuccess,
subMessage: 'Error at message ack/reject',
}),
);
}
if (!handler) {
return;
}

await handler.execute();
}

async interventSignalInterceptors({
Expand Down
12 changes: 6 additions & 6 deletions test/rpc-integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import uuid from 'uuid/v4';
import EError from 'eerror';
import RpcClient from '../src/Client';
import RpcService from '../src/Service';
import RpcServiceHandler from '../src/RpcServiceHandler';
import RpcHandler from '../src/rpc/Handler';

test.beforeEach(async t => {
const ctx = {};
Expand Down Expand Up @@ -45,7 +45,7 @@ test('service and client basic integration', async t => {
const reply = { bar: 'foo' };

await service.addHandler(
class extends RpcServiceHandler {
class extends RpcHandler {
async handle() {
t.deepEqual(this.payload, payload);
return reply;
Expand Down Expand Up @@ -75,7 +75,7 @@ test('send payload to service without wait response', async t => {
let onHandleExecuted = () => {};

await service.addHandler(
class extends RpcServiceHandler {
class extends RpcHandler {
async handle() {
t.deepEqual(this.payload, payload);
t.is(sendIsReturnedResult, true);
Expand Down Expand Up @@ -109,7 +109,7 @@ test('class-based handler for service', async t => {
const reply = { bar: 'foo' };

await service.addHandler(
class extends RpcServiceHandler {
class extends RpcHandler {
get action() {
return 'myAction';
}
Expand Down Expand Up @@ -141,7 +141,7 @@ test('correct pass error from service', async t => {
});

await service.addHandler(
class extends RpcServiceHandler {
class extends RpcHandler {
async handle() {
throw EError.wrap(error, this.payload);
}
Expand Down Expand Up @@ -171,7 +171,7 @@ test('throw error to client on not found action', async t => {
await service.ensureConnection();

await service.addHandler(
class extends RpcServiceHandler {
class extends RpcHandler {
get action() {
return 'myAction';
}
Expand Down

0 comments on commit 587cbd5

Please sign in to comment.