Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(routing): allow to discover mediator pickup strategy #669

3 changes: 1 addition & 2 deletions packages/core/src/agent/AgentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { AriesFrameworkError } from '../error'
import { ConsoleLogger, LogLevel } from '../logger'
import { AutoAcceptCredential } from '../modules/credentials/CredentialAutoAcceptType'
import { AutoAcceptProof } from '../modules/proofs/ProofAutoAcceptType'
import { MediatorPickupStrategy } from '../modules/routing/MediatorPickupStrategy'
import { DidCommMimeType } from '../types'

export class AgentConfig {
Expand Down Expand Up @@ -77,7 +76,7 @@ export class AgentConfig {
}

public get mediatorPickupStrategy() {
return this.initConfig.mediatorPickupStrategy ?? MediatorPickupStrategy.Explicit
return this.initConfig.mediatorPickupStrategy
}

public get endpoints(): [string, ...string[]] {
Expand Down
78 changes: 73 additions & 5 deletions packages/core/src/modules/routing/RecipientModule.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
import type { AgentMessageProcessedEvent } from '../../agent/Events'
import type { Logger } from '../../logger'
import type { OutboundWebSocketClosedEvent } from '../../transport'
import type { OutboundMessage } from '../../types'
import type { ConnectionRecord } from '../connections'
import type { MediationStateChangedEvent } from './RoutingEvents'
import type { MediationRecord } from './index'

import { firstValueFrom, interval, ReplaySubject, timer } from 'rxjs'
import { filter, first, takeUntil, throttleTime, timeout, tap, delayWhen } from 'rxjs/operators'
import { firstValueFrom, interval, of, ReplaySubject, timer } from 'rxjs'
import { filter, first, takeUntil, throttleTime, timeout, tap, delayWhen, catchError, map } from 'rxjs/operators'
import { Lifecycle, scoped } from 'tsyringe'

import { AgentConfig } from '../../agent/AgentConfig'
import { Dispatcher } from '../../agent/Dispatcher'
import { EventEmitter } from '../../agent/EventEmitter'
import { AgentEventTypes } from '../../agent/Events'
import { MessageSender } from '../../agent/MessageSender'
import { createOutboundMessage } from '../../agent/helpers'
import { AriesFrameworkError } from '../../error'
import { TransportEventTypes } from '../../transport'
import { parseMessageType } from '../../utils/messageType'
import { ConnectionInvitationMessage } from '../connections'
import { ConnectionService } from '../connections/services'
import { DiscloseMessage, DiscoverFeaturesModule } from '../discover-features'

import { MediatorPickupStrategy } from './MediatorPickupStrategy'
import { RoutingEventTypes } from './RoutingEvents'
Expand All @@ -26,6 +30,7 @@ import { MediationDenyHandler } from './handlers/MediationDenyHandler'
import { MediationGrantHandler } from './handlers/MediationGrantHandler'
import { BatchPickupMessage } from './messages/BatchPickupMessage'
import { MediationState } from './models/MediationState'
import { MediationRepository } from './repository'
import { MediationRecipientService } from './services/MediationRecipientService'

@scoped(Lifecycle.ContainerScoped)
Expand All @@ -36,21 +41,27 @@ export class RecipientModule {
private messageSender: MessageSender
private eventEmitter: EventEmitter
private logger: Logger
private discoverFeaturesModule: DiscoverFeaturesModule
private mediationRepository: MediationRepository

public constructor(
dispatcher: Dispatcher,
agentConfig: AgentConfig,
mediationRecipientService: MediationRecipientService,
connectionService: ConnectionService,
messageSender: MessageSender,
eventEmitter: EventEmitter
eventEmitter: EventEmitter,
discoverFeaturesModule: DiscoverFeaturesModule,
mediationRepository: MediationRepository
) {
this.agentConfig = agentConfig
this.connectionService = connectionService
this.mediationRecipientService = mediationRecipientService
this.messageSender = messageSender
this.eventEmitter = eventEmitter
this.logger = agentConfig.logger
this.discoverFeaturesModule = discoverFeaturesModule
this.mediationRepository = mediationRepository
this.registerHandlers(dispatcher)
}

Expand Down Expand Up @@ -153,8 +164,8 @@ export class RecipientModule {
}

public async initiateMessagePickup(mediator: MediationRecord) {
const { mediatorPickupStrategy, mediatorPollingInterval } = this.agentConfig

const { mediatorPollingInterval } = this.agentConfig
const mediatorPickupStrategy = await this.getPickupStrategyForMediator(mediator)
const mediatorConnection = await this.connectionService.getById(mediator.connectionId)

// Explicit means polling every X seconds with batch message
Expand All @@ -181,6 +192,63 @@ export class RecipientModule {
}
}

private async getPickupStrategyForMediator(mediator: MediationRecord) {
let mediatorPickupStrategy = mediator.pickupStrategy ?? this.agentConfig.mediatorPickupStrategy

// If mediator pickup strategy is not configured we try to query if batch pickup
// is supported through the discover features protocol
if (!mediatorPickupStrategy) {
const isBatchPickupSupported = await this.isBatchPickupSupportedByMediator(mediator)

// Use explicit pickup strategy
mediatorPickupStrategy = isBatchPickupSupported
? MediatorPickupStrategy.Explicit
: MediatorPickupStrategy.Implicit

// Store the result so it can be reused next time
mediator.pickupStrategy = mediatorPickupStrategy
await this.mediationRepository.update(mediator)
}

return mediatorPickupStrategy
}

private async isBatchPickupSupportedByMediator(mediator: MediationRecord) {
const { protocolUri } = parseMessageType(BatchPickupMessage.type)

// Listen for response to our feature query
const replaySubject = new ReplaySubject(1)
this.eventEmitter
.observable<AgentMessageProcessedEvent>(AgentEventTypes.AgentMessageProcessed)
.pipe(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not too sure what will be listened to here, but these are a lot of operations. Can't we combine the maps and filter? This might make it a bit less readable but performance should go "up" (probably a very very minor increase though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah sure. This method is really only called once per mediator you connect to as s recipient so was not too worried about adding a few extra method calls. rxjs can be hard to read so slicing it into small steps may help future readers (including me!)

Will update

// Stop when the agent shuts down
takeUntil(this.agentConfig.stop$),
// filter by mediator connection id and query disclose message type
filter(
(e) => e.payload.connection?.id === mediator.connectionId && e.payload.message.type === DiscloseMessage.type
),
// Return whether the protocol is supported
map((e) => {
const message = e.payload.message as DiscloseMessage
return message.protocols.map((p) => p.protocolId).includes(protocolUri)
}),
// TODO: make configurable
// If we don't have an answer in 7 seconds (no response, not supported, etc...) error
timeout(7000),
// We want to return false if an error occurred
catchError(() => of(false))
)
.subscribe(replaySubject)

await this.discoverFeaturesModule.queryFeatures(mediator.connectionId, {
query: protocolUri,
comment: 'Detect if batch pickup is supported to determine pickup strategy for messages',
})

const isBatchPickupSupported = await firstValueFrom(replaySubject)
return isBatchPickupSupported
}

public async discoverMediation() {
return this.mediationRecipientService.discoverMediation()
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/modules/routing/__tests__/mediation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { SubjectOutboundTransport } from '../../../../../../tests/transport/Subj
import { getBaseConfig, waitForBasicMessage } from '../../../../tests/helpers'
import { Agent } from '../../../agent/Agent'
import { ConnectionRecord } from '../../connections'
import { MediatorPickupStrategy } from '../MediatorPickupStrategy'
import { MediationState } from '../models/MediationState'

const recipientConfig = getBaseConfig('Mediation: Recipient')
Expand Down Expand Up @@ -93,6 +94,7 @@ describe('mediator establishment', () => {
expect(recipientMediatorConnection).toBeConnectedWith(mediatorRecipientConnection)

expect(recipientMediator?.state).toBe(MediationState.Granted)
expect(recipientMediator?.pickupStrategy).toBe(MediatorPickupStrategy.Explicit)

// Initialize sender agent
senderAgent = new Agent(senderConfig.config, senderConfig.agentDependencies)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { MediatorPickupStrategy } from '../MediatorPickupStrategy'
import type { MediationRole } from '../models/MediationRole'

import { AriesFrameworkError } from '../../../error'
Expand All @@ -15,6 +16,7 @@ export interface MediationRecordProps {
endpoint?: string
recipientKeys?: string[]
routingKeys?: string[]
pickupStrategy?: MediatorPickupStrategy
tags?: CustomMediationTags
}

Expand All @@ -40,6 +42,7 @@ export class MediationRecord
public endpoint?: string
public recipientKeys!: string[]
public routingKeys!: string[]
public pickupStrategy?: MediatorPickupStrategy

public static readonly type = 'MediationRecord'
public readonly type = MediationRecord.type
Expand All @@ -57,6 +60,7 @@ export class MediationRecord
this.state = props.state
this.role = props.role
this.endpoint = props.endpoint ?? undefined
this.pickupStrategy = props.pickupStrategy
}
}

Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/storage/didcomm/DidCommMessageRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { DidCommMessageRole } from './DidCommMessageRole'

import { AriesFrameworkError } from '../../error'
import { JsonTransformer } from '../../utils/JsonTransformer'
import { rightSplit } from '../../utils/string'
import { parseMessageType } from '../../utils/messageType'
import { isJsonObject } from '../../utils/type'
import { uuid } from '../../utils/uuid'
import { BaseRecord } from '../BaseRecord'
Expand Down Expand Up @@ -61,7 +61,8 @@ export class DidCommMessageRecord extends BaseRecord<DefaultDidCommMessageTags>
public getTags() {
const messageId = this.message['@id'] as string
const messageType = this.message['@type'] as string
const [, protocolName, protocolVersion, messageName] = rightSplit(messageType, '/', 3)

const { protocolName, protocolVersion, messageName } = parseMessageType(messageType)
const [versionMajor, versionMinor] = protocolVersion.split('.')

const thread = this.message['~thread']
Expand Down
21 changes: 21 additions & 0 deletions packages/core/src/utils/__tests__/messageType.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
parseMessageType,
replaceLegacyDidSovPrefix,
replaceLegacyDidSovPrefixOnMessage,
replaceNewDidCommPrefixWithLegacyDidSov,
Expand Down Expand Up @@ -81,4 +82,24 @@ describe('messageType', () => {
)
})
})

describe('parseMessageType()', () => {
test('correctly parses the message type', () => {
expect(parseMessageType('https://didcomm.org/connections/1.0/request')).toEqual({
documentUri: 'https://didcomm.org',
protocolName: 'connections',
protocolVersion: '1.0',
messageName: 'request',
protocolUri: `https://didcomm.org/connections/1.0`,
})

expect(parseMessageType('https://didcomm.org/issue-credential/1.0/propose-credential')).toEqual({
documentUri: 'https://didcomm.org',
protocolName: 'issue-credential',
protocolVersion: '1.0',
messageName: 'propose-credential',
protocolUri: `https://didcomm.org/issue-credential/1.0`,
})
})
})
})
51 changes: 51 additions & 0 deletions packages/core/src/utils/messageType.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,56 @@
import type { PlaintextMessage } from '../types'

import { rightSplit } from './string'

export interface ParsedMessageType {
/**
* Message name
*
* @example request
*/
messageName: string

/**
* Version of the protocol
*
* @example 1.0
*/
protocolVersion: string

/**
* Name of the protocol
*
* @example connections
*/
protocolName: string

/**
* Document uri of the message.
*
* @example https://didcomm.org
*/
documentUri: string

/**
* Uri identifier of the protocol. Includes the
* documentUri, protocolName and protocolVersion.
* Useful when working with feature discovery
*/
protocolUri: string
}

export function parseMessageType(messageType: string): ParsedMessageType {
const [documentUri, protocolName, protocolVersion, messageName] = rightSplit(messageType, '/', 3)

return {
documentUri,
protocolName,
protocolVersion,
messageName,
protocolUri: `${documentUri}/${protocolName}/${protocolVersion}`,
}
}

export function replaceLegacyDidSovPrefixOnMessage(message: PlaintextMessage | Record<string, unknown>) {
message['@type'] = replaceLegacyDidSovPrefix(message['@type'] as string)
}
Expand Down