Skip to content

Commit

Permalink
feat: queued messages reception time (#1824)
Browse files Browse the repository at this point in the history
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
  • Loading branch information
genaris committed Apr 10, 2024
1 parent 8154df4 commit 0b4b8dd
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 6 deletions.
1 change: 1 addition & 0 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
.receiveMessage(e.payload.message, {
connection: e.payload.connection,
contextCorrelationId: e.payload.contextCorrelationId,
receivedAt: e.payload.receivedAt,
})
.catch((error) => {
this.logger.error('Failed to process message', { error })
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ class Dispatcher {
payload: {
message,
connection,
receivedAt: messageContext.receivedAt,
},
})
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/agent/Events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export interface AgentMessageReceivedEvent extends BaseEvent {
message: unknown
connection?: ConnectionRecord
contextCorrelationId?: string
receivedAt?: Date
}
}

Expand All @@ -41,6 +42,7 @@ export interface AgentMessageProcessedEvent extends BaseEvent {
payload: {
message: AgentMessage
connection?: ConnectionRecord
receivedAt?: Date
}
}

Expand Down
21 changes: 15 additions & 6 deletions packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,13 @@ export class MessageReceiver {
session,
connection,
contextCorrelationId,
}: { session?: TransportSession; connection?: ConnectionRecord; contextCorrelationId?: string } = {}
receivedAt,
}: {
session?: TransportSession
connection?: ConnectionRecord
contextCorrelationId?: string
receivedAt?: Date
} = {}
) {
this.logger.debug(`Agent received message`)

Expand All @@ -93,9 +99,9 @@ export class MessageReceiver {

try {
if (this.isEncryptedMessage(inboundMessage)) {
await this.receiveEncryptedMessage(agentContext, inboundMessage as EncryptedMessage, session)
await this.receiveEncryptedMessage(agentContext, inboundMessage as EncryptedMessage, session, receivedAt)
} else if (this.isPlaintextMessage(inboundMessage)) {
await this.receivePlaintextMessage(agentContext, inboundMessage, connection)
await this.receivePlaintextMessage(agentContext, inboundMessage, connection, receivedAt)
} else {
throw new CredoError('Unable to parse incoming message: unrecognized format')
}
Expand All @@ -108,17 +114,19 @@ export class MessageReceiver {
private async receivePlaintextMessage(
agentContext: AgentContext,
plaintextMessage: PlaintextMessage,
connection?: ConnectionRecord
connection?: ConnectionRecord,
receivedAt?: Date
) {
const message = await this.transformAndValidate(agentContext, plaintextMessage)
const messageContext = new InboundMessageContext(message, { connection, agentContext })
const messageContext = new InboundMessageContext(message, { connection, agentContext, receivedAt })
await this.dispatcher.dispatch(messageContext)
}

private async receiveEncryptedMessage(
agentContext: AgentContext,
encryptedMessage: EncryptedMessage,
session?: TransportSession
session?: TransportSession,
receivedAt?: Date
) {
const decryptedMessage = await this.decryptMessage(agentContext, encryptedMessage)
const { plaintextMessage, senderKey, recipientKey } = decryptedMessage
Expand All @@ -140,6 +148,7 @@ export class MessageReceiver {
senderKey,
recipientKey,
agentContext,
receivedAt,
})

// We want to save a session if there is a chance of returning outbound message via inbound transport.
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/agent/models/InboundMessageContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export interface MessageContextParams {
senderKey?: Key
recipientKey?: Key
agentContext: AgentContext
receivedAt?: Date
}

export class InboundMessageContext<T extends AgentMessage = AgentMessage> {
Expand All @@ -19,6 +20,7 @@ export class InboundMessageContext<T extends AgentMessage = AgentMessage> {
public sessionId?: string
public senderKey?: Key
public recipientKey?: Key
public receivedAt: Date
public readonly agentContext: AgentContext

public constructor(message: T, context: MessageContextParams) {
Expand All @@ -28,6 +30,7 @@ export class InboundMessageContext<T extends AgentMessage = AgentMessage> {
this.connection = context.connection
this.sessionId = context.sessionId
this.agentContext = context.agentContext
this.receivedAt = context.receivedAt ?? new Date()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol {
(msg) =>
new Attachment({
id: msg.id,
lastmodTime: msg.receivedAt,
data: {
json: msg.encryptedMessage,
},
Expand Down Expand Up @@ -190,6 +191,7 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol {
(msg) =>
new Attachment({
id: msg.id,
lastmodTime: msg.receivedAt,
data: {
json: msg.encryptedMessage,
},
Expand Down Expand Up @@ -323,6 +325,7 @@ export class V2MessagePickupProtocol extends BaseMessagePickupProtocol {
payload: {
message: attachment.getDataAsJson<EncryptedMessage>(),
contextCorrelationId: messageContext.agentContext.contextCorrelationId,
receivedAt: attachment.lastmodTime,
},
})
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import type { EncryptedMessage } from '../../../types'

/**
* Basic representation of an encrypted message in a Message Pickup Queue
* - id: Message Pickup repository's specific queued message id (unrelated to DIDComm message id)
* - receivedAt: reception time (i.e. time when the message has been added to the queue)
* - encryptedMessage: packed message
*/
export type QueuedMessage = {
id: string
receivedAt?: Date
encryptedMessage: EncryptedMessage
}

0 comments on commit 0b4b8dd

Please sign in to comment.