Skip to content

Commit

Permalink
feat(basic-messages): improve sending error handling
Browse files Browse the repository at this point in the history
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
  • Loading branch information
genaris committed Oct 6, 2022
1 parent 0d14a71 commit ca74456
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 3 deletions.
21 changes: 21 additions & 0 deletions packages/core/src/error/MessageSendingError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import type { AgentMessage } from '../agent/AgentMessage'
import type { BaseRecord } from '../storage/BaseRecord'

import { AriesFrameworkError } from './AriesFrameworkError'

export class MessageSendingError extends AriesFrameworkError {
public agentMessage: AgentMessage
public associatedRecord?: BaseRecord
public constructor(
message: string,
{
agentMessage,
associatedRecord,
cause,
}: { agentMessage: AgentMessage; associatedRecord?: BaseRecord; cause?: Error }
) {
super(message, { cause })
this.agentMessage = agentMessage
this.associatedRecord = associatedRecord
}
}
1 change: 1 addition & 0 deletions packages/core/src/error/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ export * from './RecordNotFoundError'
export * from './RecordDuplicateError'
export * from './IndySdkError'
export * from './ClassValidationError'
export * from './MessageSendingError'
54 changes: 52 additions & 2 deletions packages/core/src/modules/basic-messages/BasicMessagesModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { BasicMessageTags } from './repository/BasicMessageRecord'
import { Dispatcher } from '../../agent/Dispatcher'
import { MessageSender } from '../../agent/MessageSender'
import { createOutboundMessage } from '../../agent/helpers'
import { MessageSendingError } from '../../error/MessageSendingError'
import { injectable, module } from '../../plugins'
import { ConnectionService } from '../connections'

Expand All @@ -30,18 +31,67 @@ export class BasicMessagesModule {
this.registerHandlers(dispatcher)
}

/**
* Send a message to an active connection
*
* @param connectionId Connection Id
* @param message Message contents
* @throws {RecordNotFoundError} If connection is not found
* @throws {MessageSendingError} If message is undeliverable
* @returns the created record
*/
public async sendMessage(connectionId: string, message: string) {
const connection = await this.connectionService.getById(connectionId)

const basicMessage = await this.basicMessageService.createMessage(message, connection)
const { message: basicMessage, record: basicMessageRecord } = await this.basicMessageService.createMessage(
message,
connection
)
const outboundMessage = createOutboundMessage(connection, basicMessage)
await this.messageSender.sendMessage(outboundMessage)
try {
await this.messageSender.sendMessage(outboundMessage)
} catch (error) {
throw new MessageSendingError(`Error sending basic message: ${error.message}`, {
agentMessage: basicMessage,
associatedRecord: basicMessageRecord,
cause: error,
})
}
return basicMessageRecord
}

/**
* Retrieve all basic messages matching a given query
*
* @param query The query
* @returns array containing all matching records
*/
public async findAllByQuery(query: Partial<BasicMessageTags>) {
return this.basicMessageService.findAllByQuery(query)
}

/**
* Retrieve a basic message record by id
*
* @param basicMessageRecordId The basic message record id
* @throws {RecordNotFoundError} If no record is found
* @return The basic message record
*
*/
public async getById(basicMessageRecordId: string) {
return this.basicMessageService.getById(basicMessageRecordId)
}

/**
* Delete a basic message record by id
*
* @param connectionId the basic message record id
* @throws {RecordNotFoundError} If no record is found
*/
public async deleteById(basicMessageRecordId: string) {
await this.basicMessageService.deleteById(basicMessageRecordId)
}

private registerHandlers(dispatcher: Dispatcher) {
dispatcher.registerHandler(new BasicMessageHandler(this.basicMessageService))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/* eslint-disable @typescript-eslint/no-non-null-assertion */
import type { SubjectMessage } from '../../../../../../tests/transport/SubjectInboundTransport'
import type { ConnectionRecord } from '../../../modules/connections'

import { Subject } from 'rxjs'

import { SubjectInboundTransport } from '../../../../../../tests/transport/SubjectInboundTransport'
import { SubjectOutboundTransport } from '../../../../../../tests/transport/SubjectOutboundTransport'
import { getBaseConfig, makeConnection, waitForBasicMessage } from '../../../../tests/helpers'
import testLogger from '../../../../tests/logger'
import { Agent } from '../../../agent/Agent'
import { MessageSendingError, RecordNotFoundError } from '../../../error'
import { BasicMessageRecord } from '../repository'

const faberConfig = getBaseConfig('Faber Basic Messages', {
endpoints: ['rxjs:faber'],
})

const aliceConfig = getBaseConfig('Alice Basic Messages', {
endpoints: ['rxjs:alice'],
})

describe('Basic Messages E2E', () => {
let faberAgent: Agent
let aliceAgent: Agent
let faberConnection: ConnectionRecord
let aliceConnection: ConnectionRecord

beforeEach(async () => {
const faberMessages = new Subject<SubjectMessage>()
const aliceMessages = new Subject<SubjectMessage>()
const subjectMap = {
'rxjs:faber': faberMessages,
'rxjs:alice': aliceMessages,
}

faberAgent = new Agent(faberConfig.config, faberConfig.agentDependencies)
faberAgent.registerInboundTransport(new SubjectInboundTransport(faberMessages))
faberAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap))
await faberAgent.initialize()

aliceAgent = new Agent(aliceConfig.config, aliceConfig.agentDependencies)
aliceAgent.registerInboundTransport(new SubjectInboundTransport(aliceMessages))
aliceAgent.registerOutboundTransport(new SubjectOutboundTransport(subjectMap))
await aliceAgent.initialize()
;[aliceConnection, faberConnection] = await makeConnection(aliceAgent, faberAgent)
})

afterEach(async () => {
await faberAgent.shutdown()
await faberAgent.wallet.delete()
await aliceAgent.shutdown()
await aliceAgent.wallet.delete()
})

test('Alice and Faber exchange messages', async () => {
testLogger.test('Alice sends message to Faber')
const helloRecord = await aliceAgent.basicMessages.sendMessage(aliceConnection.id, 'Hello')

expect(helloRecord.content).toBe('Hello')

testLogger.test('Faber waits for message from Alice')
await waitForBasicMessage(faberAgent, {
content: 'Hello',
})

testLogger.test('Faber sends message to Alice')
const replyRecord = await faberAgent.basicMessages.sendMessage(faberConnection.id, 'How are you?')
expect(replyRecord.content).toBe('How are you?')

testLogger.test('Alice waits until she receives message from faber')
await waitForBasicMessage(aliceAgent, {
content: 'How are you?',
})
})

test('Alice is unable to send a message', async () => {
testLogger.test('Alice sends message to Faber that is undeliverable')

const spy = jest.spyOn(aliceAgent.outboundTransports[0], 'sendMessage').mockRejectedValue(new Error('any error'))

await expect(aliceAgent.basicMessages.sendMessage(aliceConnection.id, 'Hello')).rejects.toThrowError(
MessageSendingError
)
try {
await aliceAgent.basicMessages.sendMessage(aliceConnection.id, 'Hello undeliverable')
} catch (error) {
const thrownError = error as MessageSendingError
expect(thrownError.message).toEqual(
`Error sending basic message: Message is undeliverable to connection ${aliceConnection.id} (${aliceConnection.theirLabel})`
)
testLogger.test('Error thrown includes the message and recently created record')
expect(thrownError.associatedRecord).toBeInstanceOf(BasicMessageRecord)
expect((thrownError.associatedRecord as BasicMessageRecord).content).toBe('Hello undeliverable')

testLogger.test('Created record can be found and deleted by id')
const storedRecord = await aliceAgent.basicMessages.getById(thrownError.associatedRecord!.id)
expect(storedRecord).toMatchObject(thrownError.associatedRecord!)

await aliceAgent.basicMessages.deleteById(storedRecord.id)
await expect(aliceAgent.basicMessages.getById(thrownError.associatedRecord!.id)).rejects.toThrowError(
RecordNotFoundError
)
}
spy.mockClear()
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export class BasicMessageService {
await this.basicMessageRepository.save(basicMessageRecord)
this.emitStateChangedEvent(basicMessageRecord, basicMessage)

return basicMessage
return { message: basicMessage, record: basicMessageRecord }
}

/**
Expand Down Expand Up @@ -64,4 +64,13 @@ export class BasicMessageService {
public async findAllByQuery(query: Partial<BasicMessageTags>) {
return this.basicMessageRepository.findByQuery(query)
}

public async getById(basicMessageRecordId: string) {
return this.basicMessageRepository.getById(basicMessageRecordId)
}

public async deleteById(basicMessageRecordId: string) {
const basicMessageRecord = await this.getById(basicMessageRecordId)
return this.basicMessageRepository.delete(basicMessageRecord)
}
}

0 comments on commit ca74456

Please sign in to comment.