Skip to content

Commit

Permalink
feat!: add AgentMessageSentEvent and associate records to outbound me…
Browse files Browse the repository at this point in the history
…ssages (#1099)

Signed-off-by: Ariel Gentile <gentilester@gmail.com>
  • Loading branch information
genaris authored Nov 24, 2022
1 parent 03cdf39 commit c68145a
Show file tree
Hide file tree
Showing 57 changed files with 1,082 additions and 491 deletions.
29 changes: 22 additions & 7 deletions packages/action-menu/src/ActionMenuApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
ConnectionService,
Dispatcher,
MessageSender,
createOutboundMessage,
OutboundMessageContext,
injectable,
} from '@aries-framework/core'

Expand Down Expand Up @@ -59,8 +59,13 @@ export class ActionMenuApi {
connection,
})

const outboundMessage = createOutboundMessage(connection, message)
await this.messageSender.sendMessage(this.agentContext, outboundMessage)
const outboundMessageContext = new OutboundMessageContext(message, {
agentContext: this.agentContext,
connection,
associatedRecord: record,
})

await this.messageSender.sendMessage(outboundMessageContext)

return record
}
Expand All @@ -80,8 +85,13 @@ export class ActionMenuApi {
menu: options.menu,
})

const outboundMessage = createOutboundMessage(connection, message)
await this.messageSender.sendMessage(this.agentContext, outboundMessage)
const outboundMessageContext = new OutboundMessageContext(message, {
agentContext: this.agentContext,
connection,
associatedRecord: record,
})

await this.messageSender.sendMessage(outboundMessageContext)

return record
}
Expand Down Expand Up @@ -109,8 +119,13 @@ export class ActionMenuApi {
performedAction: options.performedAction,
})

const outboundMessage = createOutboundMessage(connection, message)
await this.messageSender.sendMessage(this.agentContext, outboundMessage)
const outboundMessageContext = new OutboundMessageContext(message, {
agentContext: this.agentContext,
connection,
associatedRecord: record,
})

await this.messageSender.sendMessage(outboundMessageContext)

return record
}
Expand Down
45 changes: 20 additions & 25 deletions packages/core/src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { OutboundMessage, OutboundServiceMessage } from '../types'
import type { AgentMessage } from './AgentMessage'
import type { AgentMessageProcessedEvent } from './Events'
import type { Handler } from './Handler'
Expand All @@ -14,7 +13,7 @@ import { ProblemReportMessage } from './../modules/problem-reports/messages/Prob
import { EventEmitter } from './EventEmitter'
import { AgentEventTypes } from './Events'
import { MessageSender } from './MessageSender'
import { isOutboundServiceMessage } from './helpers'
import { OutboundMessageContext } from './models'

@injectable()
class Dispatcher {
Expand All @@ -38,14 +37,14 @@ class Dispatcher {
}

public async dispatch(messageContext: InboundMessageContext): Promise<void> {
const message = messageContext.message
const { agentContext, connection, senderKey, recipientKey, message } = messageContext
const handler = this.getHandlerForType(message.type)

if (!handler) {
throw new AriesFrameworkError(`No handler for message type "${message.type}" found`)
}

let outboundMessage: OutboundMessage<AgentMessage> | OutboundServiceMessage<AgentMessage> | void
let outboundMessage: OutboundMessageContext<AgentMessage> | void

try {
outboundMessage = await handler.handle(messageContext)
Expand All @@ -54,43 +53,39 @@ class Dispatcher {

if (problemReportMessage instanceof ProblemReportMessage && messageContext.connection) {
problemReportMessage.setThread({
threadId: messageContext.message.threadId,
threadId: message.threadId,
})
outboundMessage = {
payload: problemReportMessage,
outboundMessage = new OutboundMessageContext(problemReportMessage, {
agentContext,
connection: messageContext.connection,
}
})
} else {
this.logger.error(`Error handling message with type ${message.type}`, {
message: message.toJSON(),
error,
senderKey: messageContext.senderKey?.fingerprint,
recipientKey: messageContext.recipientKey?.fingerprint,
connectionId: messageContext.connection?.id,
senderKey: senderKey?.fingerprint,
recipientKey: recipientKey?.fingerprint,
connectionId: connection?.id,
})

throw error
}
}

if (outboundMessage && isOutboundServiceMessage(outboundMessage)) {
await this.messageSender.sendMessageToService(messageContext.agentContext, {
message: outboundMessage.payload,
service: outboundMessage.service,
senderKey: outboundMessage.senderKey,
returnRoute: true,
})
} else if (outboundMessage) {
outboundMessage.sessionId = messageContext.sessionId
await this.messageSender.sendMessage(messageContext.agentContext, outboundMessage)
if (outboundMessage) {
if (outboundMessage.isOutboundServiceMessage()) {
await this.messageSender.sendMessageToService(outboundMessage)
} else {
outboundMessage.sessionId = messageContext.sessionId
await this.messageSender.sendMessage(outboundMessage)
}
}

// Emit event that allows to hook into received messages
this.eventEmitter.emit<AgentMessageProcessedEvent>(messageContext.agentContext, {
this.eventEmitter.emit<AgentMessageProcessedEvent>(agentContext, {
type: AgentEventTypes.AgentMessageProcessed,
payload: {
message: messageContext.message,
connection: messageContext.connection,
message,
connection,
},
})
}
Expand Down
10 changes: 10 additions & 0 deletions packages/core/src/agent/Events.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import type { ConnectionRecord } from '../modules/connections'
import type { AgentMessage } from './AgentMessage'
import type { OutboundMessageContext, OutboundMessageSendStatus } from './models'
import type { Observable } from 'rxjs'

import { filter } from 'rxjs'
Expand All @@ -13,6 +14,7 @@ export function filterContextCorrelationId(contextCorrelationId: string) {
export enum AgentEventTypes {
AgentMessageReceived = 'AgentMessageReceived',
AgentMessageProcessed = 'AgentMessageProcessed',
AgentMessageSent = 'AgentMessageSent',
}

export interface EventMetadata {
Expand Down Expand Up @@ -41,3 +43,11 @@ export interface AgentMessageProcessedEvent extends BaseEvent {
connection?: ConnectionRecord
}
}

export interface AgentMessageSentEvent extends BaseEvent {
type: typeof AgentEventTypes.AgentMessageSent
payload: {
message: OutboundMessageContext
status: OutboundMessageSendStatus
}
}
5 changes: 2 additions & 3 deletions packages/core/src/agent/Handler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import type { OutboundMessage, OutboundServiceMessage } from '../types'
import type { ConstructableAgentMessage } from './AgentMessage'
import type { InboundMessageContext } from './models/InboundMessageContext'
import type { InboundMessageContext, OutboundMessageContext } from './models'

export interface Handler {
readonly supportedMessages: readonly ConstructableAgentMessage[]

handle(messageContext: InboundMessageContext): Promise<OutboundMessage | OutboundServiceMessage | void>
handle(messageContext: InboundMessageContext): Promise<OutboundMessageContext | void>
}

/**
Expand Down
9 changes: 4 additions & 5 deletions packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import { EnvelopeService } from './EnvelopeService'
import { MessageSender } from './MessageSender'
import { TransportService } from './TransportService'
import { AgentContextProvider } from './context'
import { createOutboundMessage } from './helpers'
import { InboundMessageContext } from './models/InboundMessageContext'
import { InboundMessageContext, OutboundMessageContext } from './models'

@injectable()
export class MessageReceiver {
Expand Down Expand Up @@ -277,9 +276,9 @@ export class MessageReceiver {
problemReportMessage.setThread({
threadId: plaintextMessage['@id'],
})
const outboundMessage = createOutboundMessage(connection, problemReportMessage)
if (outboundMessage) {
await this.messageSender.sendMessage(agentContext, outboundMessage)
const outboundMessageContext = new OutboundMessageContext(problemReportMessage, { agentContext, connection })
if (outboundMessageContext) {
await this.messageSender.sendMessage(outboundMessageContext)
}
}
}
Loading

0 comments on commit c68145a

Please sign in to comment.