Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: add AgentMessageSentEvent and associate records to outbound messages #1099

Merged
merged 6 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions packages/action-menu/src/ActionMenuApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
ConnectionService,
Dispatcher,
MessageSender,
createOutboundMessage,
OutboundMessageContext,
injectable,
} from '@aries-framework/core'

Expand Down Expand Up @@ -59,8 +59,13 @@ export class ActionMenuApi {
connection,
})

const outboundMessage = createOutboundMessage(connection, message)
await this.messageSender.sendMessage(this.agentContext, outboundMessage)
const outboundMessageContext = new OutboundMessageContext(message, {
agentContext: this.agentContext,
connection,
associatedRecord: record,
})

await this.messageSender.sendMessage(outboundMessageContext)

return record
}
Expand All @@ -80,8 +85,13 @@ export class ActionMenuApi {
menu: options.menu,
})

const outboundMessage = createOutboundMessage(connection, message)
await this.messageSender.sendMessage(this.agentContext, outboundMessage)
const outboundMessageContext = new OutboundMessageContext(message, {
agentContext: this.agentContext,
connection,
associatedRecord: record,
})

await this.messageSender.sendMessage(outboundMessageContext)

return record
}
Expand Down Expand Up @@ -109,8 +119,13 @@ export class ActionMenuApi {
performedAction: options.performedAction,
})

const outboundMessage = createOutboundMessage(connection, message)
await this.messageSender.sendMessage(this.agentContext, outboundMessage)
const outboundMessageContext = new OutboundMessageContext(message, {
agentContext: this.agentContext,
connection,
associatedRecord: record,
})

await this.messageSender.sendMessage(outboundMessageContext)

return record
}
Expand Down
45 changes: 20 additions & 25 deletions packages/core/src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { OutboundMessage, OutboundServiceMessage } from '../types'
import type { AgentMessage } from './AgentMessage'
import type { AgentMessageProcessedEvent } from './Events'
import type { Handler } from './Handler'
Expand All @@ -14,7 +13,7 @@ import { ProblemReportMessage } from './../modules/problem-reports/messages/Prob
import { EventEmitter } from './EventEmitter'
import { AgentEventTypes } from './Events'
import { MessageSender } from './MessageSender'
import { isOutboundServiceMessage } from './helpers'
import { OutboundMessageContext } from './models'

@injectable()
class Dispatcher {
Expand All @@ -38,14 +37,14 @@ class Dispatcher {
}

public async dispatch(messageContext: InboundMessageContext): Promise<void> {
const message = messageContext.message
const { agentContext, connection, senderKey, recipientKey, message } = messageContext
const handler = this.getHandlerForType(message.type)

if (!handler) {
throw new AriesFrameworkError(`No handler for message type "${message.type}" found`)
}

let outboundMessage: OutboundMessage<AgentMessage> | OutboundServiceMessage<AgentMessage> | void
let outboundMessage: OutboundMessageContext<AgentMessage> | void

try {
outboundMessage = await handler.handle(messageContext)
Expand All @@ -54,43 +53,39 @@ class Dispatcher {

if (problemReportMessage instanceof ProblemReportMessage && messageContext.connection) {
problemReportMessage.setThread({
threadId: messageContext.message.threadId,
threadId: message.threadId,
})
outboundMessage = {
payload: problemReportMessage,
outboundMessage = new OutboundMessageContext(problemReportMessage, {
agentContext,
connection: messageContext.connection,
}
})
} else {
this.logger.error(`Error handling message with type ${message.type}`, {
message: message.toJSON(),
error,
senderKey: messageContext.senderKey?.fingerprint,
recipientKey: messageContext.recipientKey?.fingerprint,
connectionId: messageContext.connection?.id,
senderKey: senderKey?.fingerprint,
recipientKey: recipientKey?.fingerprint,
connectionId: connection?.id,
})

throw error
}
}

if (outboundMessage && isOutboundServiceMessage(outboundMessage)) {
await this.messageSender.sendMessageToService(messageContext.agentContext, {
message: outboundMessage.payload,
service: outboundMessage.service,
senderKey: outboundMessage.senderKey,
returnRoute: true,
})
} else if (outboundMessage) {
outboundMessage.sessionId = messageContext.sessionId
await this.messageSender.sendMessage(messageContext.agentContext, outboundMessage)
if (outboundMessage) {
if (outboundMessage.isOutboundServiceMessage()) {
await this.messageSender.sendMessageToService(outboundMessage)
} else {
outboundMessage.sessionId = messageContext.sessionId
await this.messageSender.sendMessage(outboundMessage)
}
}

// Emit event that allows to hook into received messages
this.eventEmitter.emit<AgentMessageProcessedEvent>(messageContext.agentContext, {
this.eventEmitter.emit<AgentMessageProcessedEvent>(agentContext, {
type: AgentEventTypes.AgentMessageProcessed,
payload: {
message: messageContext.message,
connection: messageContext.connection,
message,
connection,
},
})
}
Expand Down
10 changes: 10 additions & 0 deletions packages/core/src/agent/Events.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { ConnectionRecord } from '../modules/connections'
import type { AgentMessage } from './AgentMessage'
import type { OutboundMessageContext, OutboundMessageSendStatus } from './models'
import type { Observable } from 'rxjs'

import { filter } from 'rxjs'
Expand All @@ -13,6 +14,7 @@ export function filterContextCorrelationId(contextCorrelationId: string) {
export enum AgentEventTypes {
AgentMessageReceived = 'AgentMessageReceived',
AgentMessageProcessed = 'AgentMessageProcessed',
AgentMessageSent = 'AgentMessageSent',
}

export interface EventMetadata {
Expand Down Expand Up @@ -41,3 +43,11 @@ export interface AgentMessageProcessedEvent extends BaseEvent {
connection?: ConnectionRecord
}
}

export interface AgentMessageSentEvent extends BaseEvent {
type: typeof AgentEventTypes.AgentMessageSent
payload: {
message: OutboundMessageContext
status: OutboundMessageSendStatus
}
}
5 changes: 2 additions & 3 deletions packages/core/src/agent/Handler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import type { OutboundMessage, OutboundServiceMessage } from '../types'
import type { ConstructableAgentMessage } from './AgentMessage'
import type { InboundMessageContext } from './models/InboundMessageContext'
import type { InboundMessageContext, OutboundMessageContext } from './models'

export interface Handler {
readonly supportedMessages: readonly ConstructableAgentMessage[]

handle(messageContext: InboundMessageContext): Promise<OutboundMessage | OutboundServiceMessage | void>
handle(messageContext: InboundMessageContext): Promise<OutboundMessageContext | void>
}

/**
Expand Down
9 changes: 4 additions & 5 deletions packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import { EnvelopeService } from './EnvelopeService'
import { MessageSender } from './MessageSender'
import { TransportService } from './TransportService'
import { AgentContextProvider } from './context'
import { createOutboundMessage } from './helpers'
import { InboundMessageContext } from './models/InboundMessageContext'
import { InboundMessageContext, OutboundMessageContext } from './models'

@injectable()
export class MessageReceiver {
Expand Down Expand Up @@ -277,9 +276,9 @@ export class MessageReceiver {
problemReportMessage.setThread({
threadId: plaintextMessage['@id'],
})
const outboundMessage = createOutboundMessage(connection, problemReportMessage)
if (outboundMessage) {
await this.messageSender.sendMessage(agentContext, outboundMessage)
const outboundMessageContext = new OutboundMessageContext(problemReportMessage, { agentContext, connection })
if (outboundMessageContext) {
await this.messageSender.sendMessage(outboundMessageContext)
}
}
}
Loading