Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
TimoGlastra authored Aug 30, 2022
2 parents 66118ba + 69d4906 commit 90ad5b3
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 52 deletions.
95 changes: 66 additions & 29 deletions packages/core/src/modules/routing/RecipientModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import type { MediationStateChangedEvent } from './RoutingEvents'
import type { MediationRecord } from './index'
import type { GetRoutingOptions } from './services/RoutingService'

import { firstValueFrom, interval, ReplaySubject, timer } from 'rxjs'
import { firstValueFrom, interval, merge, ReplaySubject, Subject, timer } from 'rxjs'
import { filter, first, takeUntil, throttleTime, timeout, tap, delayWhen } from 'rxjs/operators'

import { AgentConfig } from '../../agent/AgentConfig'
Expand Down Expand Up @@ -47,6 +47,9 @@ export class RecipientModule {
private mediationRepository: MediationRepository
private routingService: RoutingService

// stopMessagePickup$ is used for stop message pickup signal
private readonly stopMessagePickup$ = new Subject<boolean>()

public constructor(
dispatcher: Dispatcher,
agentConfig: AgentConfig,
Expand Down Expand Up @@ -144,11 +147,13 @@ export class RecipientModule {
// in a recursive back off strategy if it matches the following criteria:
// - Agent is not shutdown
// - Socket was for current mediator connection id

const stopConditions$ = merge(this.agentConfig.stop$, this.stopMessagePickup$).pipe()
this.eventEmitter
.observable<OutboundWebSocketClosedEvent>(TransportEventTypes.OutboundWebSocketClosedEvent)
.pipe(
// Stop when the agent shuts down
takeUntil(this.agentConfig.stop$),
// Stop when the agent shuts down or stop message pickup signal is received
takeUntil(stopConditions$),
filter((e) => e.payload.connectionId === mediator.connectionId),
// Make sure we're not reconnecting multiple times
throttleTime(interval),
Expand All @@ -157,20 +162,23 @@ export class RecipientModule {
// Wait for interval time before reconnecting
delayWhen(() => timer(interval))
)
.subscribe(async () => {
this.logger.debug(
`Websocket connection to mediator with connectionId '${mediator.connectionId}' is closed, attempting to reconnect...`
)
try {
if (pickupStrategy === MediatorPickupStrategy.PickUpV2) {
// Start Pickup v2 protocol to receive messages received while websocket offline
await this.sendStatusRequest({ mediatorId: mediator.id })
} else {
await this.openMediationWebSocket(mediator)
.subscribe({
next: async () => {
this.logger.debug(
`Websocket connection to mediator with connectionId '${mediator.connectionId}' is closed, attempting to reconnect...`
)
try {
if (pickupStrategy === MediatorPickupStrategy.PickUpV2) {
// Start Pickup v2 protocol to receive messages received while websocket offline
await this.sendStatusRequest({ mediatorId: mediator.id })
} else {
await this.openMediationWebSocket(mediator)
}
} catch (error) {
this.logger.warn('Unable to re-open websocket connection to mediator', { error })
}
} catch (error) {
this.logger.warn('Unable to re-open websocket connection to mediator', { error })
}
},
complete: () => this.agentConfig.logger.info(`Stopping pickup of messages from mediator '${mediator.id}'`),
})
try {
if (pickupStrategy === MediatorPickupStrategy.Implicit) {
Expand All @@ -181,40 +189,69 @@ export class RecipientModule {
}
}

public async initiateMessagePickup(mediator: MediationRecord) {
/**
* Start a Message Pickup flow with a registered Mediator.
*
* @param mediator optional {MediationRecord} corresponding to the mediator to pick messages from. It will use
* default mediator otherwise
* @param pickupStrategy optional {MediatorPickupStrategy} to use in the loop. It will use Agent's default
* strategy or attempt to find it by Discover Features otherwise
* @returns
*/
public async initiateMessagePickup(mediator?: MediationRecord, pickupStrategy?: MediatorPickupStrategy) {
const { mediatorPollingInterval } = this.agentConfig
const mediatorPickupStrategy = await this.getPickupStrategyForMediator(mediator)
const mediatorConnection = await this.connectionService.getById(mediator.connectionId)

const mediatorRecord = mediator ?? (await this.findDefaultMediator())
if (!mediatorRecord) {
throw new AriesFrameworkError('There is no mediator to pickup messages from')
}

const mediatorPickupStrategy = pickupStrategy ?? (await this.getPickupStrategyForMediator(mediatorRecord))
const mediatorConnection = await this.connectionService.getById(mediatorRecord.connectionId)

switch (mediatorPickupStrategy) {
case MediatorPickupStrategy.PickUpV2:
this.agentConfig.logger.info(`Starting pickup of messages from mediator '${mediator.id}'`)
await this.openWebSocketAndPickUp(mediator, mediatorPickupStrategy)
await this.sendStatusRequest({ mediatorId: mediator.id })
this.agentConfig.logger.info(`Starting pickup of messages from mediator '${mediatorRecord.id}'`)
await this.openWebSocketAndPickUp(mediatorRecord, mediatorPickupStrategy)
await this.sendStatusRequest({ mediatorId: mediatorRecord.id })
break
case MediatorPickupStrategy.PickUpV1: {
const stopConditions$ = merge(this.agentConfig.stop$, this.stopMessagePickup$).pipe()
// Explicit means polling every X seconds with batch message
this.agentConfig.logger.info(`Starting explicit (batch) pickup of messages from mediator '${mediator.id}'`)
this.agentConfig.logger.info(
`Starting explicit (batch) pickup of messages from mediator '${mediatorRecord.id}'`
)
const subscription = interval(mediatorPollingInterval)
.pipe(takeUntil(this.agentConfig.stop$))
.subscribe(async () => {
await this.pickupMessages(mediatorConnection)
.pipe(takeUntil(stopConditions$))
.subscribe({
next: async () => {
await this.pickupMessages(mediatorConnection)
},
complete: () =>
this.agentConfig.logger.info(`Stopping pickup of messages from mediator '${mediatorRecord.id}'`),
})
return subscription
}
case MediatorPickupStrategy.Implicit:
// Implicit means sending ping once and keeping connection open. This requires a long-lived transport
// such as WebSockets to work
this.agentConfig.logger.info(`Starting implicit pickup of messages from mediator '${mediator.id}'`)
await this.openWebSocketAndPickUp(mediator, mediatorPickupStrategy)
this.agentConfig.logger.info(`Starting implicit pickup of messages from mediator '${mediatorRecord.id}'`)
await this.openWebSocketAndPickUp(mediatorRecord, mediatorPickupStrategy)
break
default:
this.agentConfig.logger.info(
`Skipping pickup of messages from mediator '${mediator.id}' due to pickup strategy none`
`Skipping pickup of messages from mediator '${mediatorRecord.id}' due to pickup strategy none`
)
}
}

/**
* Terminate all ongoing Message Pickup loops
*/
public async stopMessagePickup() {
this.stopMessagePickup$.next(true)
}

private async sendStatusRequest(config: { mediatorId: string; recipientKey?: string }) {
const mediationRecord = await this.mediationRecipientService.getById(config.mediatorId)

Expand Down
7 changes: 0 additions & 7 deletions packages/core/src/modules/routing/__tests__/mediation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import { SubjectInboundTransport } from '../../../../../../tests/transport/Subje
import { SubjectOutboundTransport } from '../../../../../../tests/transport/SubjectOutboundTransport'
import { getBaseConfig, waitForBasicMessage } from '../../../../tests/helpers'
import { Agent } from '../../../agent/Agent'
import { sleep } from '../../../utils/sleep'
import { ConnectionRecord, HandshakeProtocol } from '../../connections'
import { MediatorPickupStrategy } from '../MediatorPickupStrategy'
import { MediationState } from '../models/MediationState'
Expand All @@ -32,12 +31,6 @@ describe('mediator establishment', () => {
let senderAgent: Agent

afterEach(async () => {
// We want to stop the mediator polling before the agent is shutdown.
// FIXME: add a way to stop mediator polling from the public api, and make sure this is
// being handled in the agent shutdown so we don't get any errors with wallets being closed.
recipientAgent.config.stop$.next(true)
await sleep(1000)

await recipientAgent?.shutdown()
await recipientAgent?.wallet.delete()
await mediatorAgent?.shutdown()
Expand Down
8 changes: 2 additions & 6 deletions packages/core/src/modules/routing/__tests__/pickup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,17 @@ import { SubjectInboundTransport } from '../../../../../../tests/transport/Subje
import { SubjectOutboundTransport } from '../../../../../../tests/transport/SubjectOutboundTransport'
import { getBaseConfig, waitForBasicMessage } from '../../../../tests/helpers'
import { Agent } from '../../../agent/Agent'
import { ConsoleLogger, LogLevel } from '../../../logger'
import { HandshakeProtocol } from '../../connections'
import { MediatorPickupStrategy } from '../MediatorPickupStrategy'

const logger = new ConsoleLogger(LogLevel.info)
const recipientConfig = getBaseConfig('Mediation: Recipient', {
const recipientConfig = getBaseConfig('Pickup: Recipient', {
autoAcceptConnections: true,
indyLedgers: [],
logger,
})
const mediatorConfig = getBaseConfig('Mediation: Mediator', {
const mediatorConfig = getBaseConfig('Pickup: Mediator', {
autoAcceptConnections: true,
endpoints: ['rxjs:mediator'],
indyLedgers: [],
logger,
})

describe('E2E Pick Up protocol', () => {
Expand Down
9 changes: 2 additions & 7 deletions packages/core/tests/connectionless-proofs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
} from '../src/modules/proofs'
import { MediatorPickupStrategy } from '../src/modules/routing'
import { LinkedAttachment } from '../src/utils/LinkedAttachment'
import { sleep } from '../src/utils/sleep'
import { uuid } from '../src/utils/uuid'

import {
Expand Down Expand Up @@ -342,11 +341,7 @@ describe('Present Proof', () => {
state: ProofState.Done,
})

// We want to stop the mediator polling before the agent is shutdown.
// FIXME: add a way to stop mediator polling from the public api, and make sure this is
// being handled in the agent shutdown so we don't get any errors with wallets being closed.
faberAgent.config.stop$.next(true)
aliceAgent.config.stop$.next(true)
await sleep(2000)
await aliceAgent.mediationRecipient.stopMessagePickup()
await faberAgent.mediationRecipient.stopMessagePickup()
})
})
4 changes: 1 addition & 3 deletions tests/e2e-test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ export async function e2eTest({
expect(verifierProof.state).toBe(ProofState.Done)

// We want to stop the mediator polling before the agent is shutdown.
// FIXME: add a way to stop mediator polling from the public api, and make sure this is
// being handled in the agent shutdown so we don't get any errors with wallets being closed.
recipientAgent.config.stop$.next(true)
await recipientAgent.mediationRecipient.stopMessagePickup()
await sleep(2000)
}

0 comments on commit 90ad5b3

Please sign in to comment.