Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: pickup v2 protocol #711

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/core/src/agent/AgentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ export class AgentConfig {
return this.initConfig.mediatorPickupStrategy
}

public get maximumMessagePickup() {
return this.initConfig.maximumMessagePickup ?? 10
}

public get endpoints(): [string, ...string[]] {
// if endpoints is not set, return queue endpoint
// https://github.com/hyperledger/aries-rfcs/issues/405#issuecomment-582612875
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { TransportSession } from './TransportService'
import { Lifecycle, scoped } from 'tsyringe'

import { AriesFrameworkError } from '../error'
import { ConnectionRepository } from '../modules/connections'
import { ConnectionRepository } from '../modules/connections/repository'
import { DidRepository } from '../modules/dids/repository/DidRepository'
import { ProblemReportError, ProblemReportMessage, ProblemReportReason } from '../modules/problem-reports'
import { isValidJweStructure } from '../utils/JWE'
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/agent/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { ConnectionRecord } from '../modules/connections'
import type { OutboundMessage, OutboundServiceMessage } from '../types'
import type { AgentMessage } from './AgentMessage'

import { IndyAgentService } from '../modules/dids'
import { DidCommService } from '../modules/dids/domain/service/DidCommService'

export function createOutboundMessage<T extends AgentMessage = AgentMessage>(
Expand All @@ -25,5 +26,6 @@ export function createOutboundServiceMessage<T extends AgentMessage = AgentMessa
export function isOutboundServiceMessage(
message: OutboundMessage | OutboundServiceMessage
): message is OutboundServiceMessage {
return (message as OutboundServiceMessage).service instanceof DidCommService
const service = (message as OutboundServiceMessage).service
return service instanceof IndyAgentService || service instanceof DidCommService
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,79 @@
import type { BaseMessage } from '../../agent/BaseMessage'
import type { AgentMessageProcessedEvent } from '../../agent/Events'

import { firstValueFrom, of, ReplaySubject } from 'rxjs'
import { filter, takeUntil, timeout, catchError, map } from 'rxjs/operators'
import { Lifecycle, scoped } from 'tsyringe'

import { AgentConfig } from '../../agent/AgentConfig'
import { Dispatcher } from '../../agent/Dispatcher'
import { EventEmitter } from '../../agent/EventEmitter'
import { AgentEventTypes } from '../../agent/Events'
import { MessageSender } from '../../agent/MessageSender'
import { createOutboundMessage } from '../../agent/helpers'
import { parseMessageType } from '../../utils/messageType'
import { ConnectionService } from '../connections/services'

import { DiscloseMessageHandler, QueryMessageHandler } from './handlers'
import { DiscloseMessage } from './messages'
import { DiscoverFeaturesService } from './services'

@scoped(Lifecycle.ContainerScoped)
export class DiscoverFeaturesModule {
private connectionService: ConnectionService
private messageSender: MessageSender
private discoverFeaturesService: DiscoverFeaturesService
private eventEmitter: EventEmitter
private agentConfig: AgentConfig

public constructor(
dispatcher: Dispatcher,
connectionService: ConnectionService,
messageSender: MessageSender,
discoverFeaturesService: DiscoverFeaturesService
discoverFeaturesService: DiscoverFeaturesService,
eventEmitter: EventEmitter,
agentConfig: AgentConfig
) {
this.connectionService = connectionService
this.messageSender = messageSender
this.discoverFeaturesService = discoverFeaturesService
this.registerHandlers(dispatcher)
this.eventEmitter = eventEmitter
this.agentConfig = agentConfig
}

public async isProtocolSupported(connectionId: string, message: BaseMessage) {
TimoGlastra marked this conversation as resolved.
Show resolved Hide resolved
const { protocolUri } = parseMessageType(message.type)

// Listen for response to our feature query
const replaySubject = new ReplaySubject(1)
this.eventEmitter
.observable<AgentMessageProcessedEvent>(AgentEventTypes.AgentMessageProcessed)
.pipe(
// Stop when the agent shuts down
takeUntil(this.agentConfig.stop$),
// filter by connection id and query disclose message type
filter((e) => e.payload.connection?.id === connectionId && e.payload.message.type === DiscloseMessage.type),
// Return whether the protocol is supported
map((e) => {
const message = e.payload.message as DiscloseMessage
return message.protocols.map((p) => p.protocolId).includes(protocolUri)
}),
// TODO: make configurable
// If we don't have an answer in 7 seconds (no response, not supported, etc...) error
timeout(7000),
// We want to return false if an error occurred
catchError(() => of(false))
)
.subscribe(replaySubject)

await this.queryFeatures(connectionId, {
query: protocolUri,
comment: 'Detect if protocol is supported',
})

const isProtocolSupported = await firstValueFrom(replaySubject)
return isProtocolSupported
}

public async queryFeatures(connectionId: string, options: { query: string; comment?: string }) {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/modules/routing/MediatorModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Lifecycle, scoped } from 'tsyringe'
import { AgentConfig } from '../../agent/AgentConfig'
import { Dispatcher } from '../../agent/Dispatcher'
import { EventEmitter } from '../../agent/EventEmitter'
import { MessageReceiver } from '../../agent/MessageReceiver'
import { MessageSender } from '../../agent/MessageSender'
import { createOutboundMessage } from '../../agent/helpers'
import { ConnectionService } from '../connections/services'
Expand All @@ -29,6 +30,7 @@ export class MediatorModule {
mediationService: MediatorService,
messagePickupService: MessagePickupService,
messageSender: MessageSender,
messageReceiver: MessageReceiver,
eventEmitter: EventEmitter,
agentConfig: AgentConfig,
connectionService: ConnectionService
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
export enum MediatorPickupStrategy {
// Explicit pickup strategy means picking up messages using the pickup protocol
Explicit = 'Explicit',
PickUpV1 = 'PickUpV1',

// Supports pickup v2
PickUpV2 = 'PickUpV2',
KolbyRKunz marked this conversation as resolved.
Show resolved Hide resolved

// Implicit pickup strategy means picking up messages only using return route
// decorator. This is what ACA-Py currently uses
Expand Down
Loading