Skip to content

Commit

Permalink
Merge branch 'main' into key-rotate
Browse files Browse the repository at this point in the history
  • Loading branch information
TimoGlastra committed Mar 30, 2022
2 parents 34b1a70 + 1b01bce commit 2ed57b1
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 22 deletions.
3 changes: 1 addition & 2 deletions packages/core/src/agent/AgentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { AriesFrameworkError } from '../error'
import { ConsoleLogger, LogLevel } from '../logger'
import { AutoAcceptCredential } from '../modules/credentials/CredentialAutoAcceptType'
import { AutoAcceptProof } from '../modules/proofs/ProofAutoAcceptType'
import { MediatorPickupStrategy } from '../modules/routing/MediatorPickupStrategy'
import { DidCommMimeType } from '../types'

export class AgentConfig {
Expand Down Expand Up @@ -77,7 +76,7 @@ export class AgentConfig {
}

public get mediatorPickupStrategy() {
return this.initConfig.mediatorPickupStrategy ?? MediatorPickupStrategy.Explicit
return this.initConfig.mediatorPickupStrategy
}

public get endpoints(): [string, ...string[]] {
Expand Down
17 changes: 12 additions & 5 deletions packages/core/src/agent/MessageReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { AriesFrameworkError } from '../error'
import { ConnectionRepository } from '../modules/connections'
import { DidRepository } from '../modules/dids/repository/DidRepository'
import { ProblemReportError, ProblemReportMessage, ProblemReportReason } from '../modules/problem-reports'
import { isValidJweStructure } from '../utils/JWE'
import { JsonTransformer } from '../utils/JsonTransformer'
import { MessageValidator } from '../utils/MessageValidator'
import { replaceLegacyDidSovPrefixOnMessage } from '../utils/messageType'
Expand Down Expand Up @@ -66,11 +67,12 @@ export class MessageReceiver {
*/
public async receiveMessage(inboundMessage: unknown, session?: TransportSession) {
this.logger.debug(`Agent ${this.config.label} received message`)

if (this.isPlaintextMessage(inboundMessage)) {
if (this.isEncryptedMessage(inboundMessage)) {
await this.receiveEncryptedMessage(inboundMessage as EncryptedMessage, session)
} else if (this.isPlaintextMessage(inboundMessage)) {
await this.receivePlaintextMessage(inboundMessage)
} else {
await this.receiveEncryptedMessage(inboundMessage as EncryptedMessage, session)
throw new AriesFrameworkError('Unable to parse incoming message: unrecognized format')
}
}

Expand Down Expand Up @@ -143,12 +145,17 @@ export class MessageReceiver {

private isPlaintextMessage(message: unknown): message is PlaintextMessage {
if (typeof message !== 'object' || message == null) {
throw new AriesFrameworkError('Invalid message received. Message should be object')
return false
}
// If the message does have an @type field we assume the message is in plaintext and it is not encrypted.
// If the message has a @type field we assume the message is in plaintext and it is not encrypted.
return '@type' in message
}

private isEncryptedMessage(message: unknown): message is EncryptedMessage {
// If the message does has valid JWE structure, we can assume the message is encrypted.
return isValidJweStructure(message)
}

private async transformAndValidate(
plaintextMessage: PlaintextMessage,
connection?: ConnectionRecord | null
Expand Down
78 changes: 73 additions & 5 deletions packages/core/src/modules/routing/RecipientModule.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,27 @@
import type { AgentMessageProcessedEvent } from '../../agent/Events'
import type { Logger } from '../../logger'
import type { OutboundWebSocketClosedEvent } from '../../transport'
import type { OutboundMessage } from '../../types'
import type { ConnectionRecord } from '../connections'
import type { MediationStateChangedEvent } from './RoutingEvents'
import type { MediationRecord } from './index'

import { firstValueFrom, interval, ReplaySubject, timer } from 'rxjs'
import { filter, first, takeUntil, throttleTime, timeout, tap, delayWhen } from 'rxjs/operators'
import { firstValueFrom, interval, of, ReplaySubject, timer } from 'rxjs'
import { filter, first, takeUntil, throttleTime, timeout, tap, delayWhen, catchError, map } from 'rxjs/operators'
import { Lifecycle, scoped } from 'tsyringe'

import { AgentConfig } from '../../agent/AgentConfig'
import { Dispatcher } from '../../agent/Dispatcher'
import { EventEmitter } from '../../agent/EventEmitter'
import { AgentEventTypes } from '../../agent/Events'
import { MessageSender } from '../../agent/MessageSender'
import { createOutboundMessage } from '../../agent/helpers'
import { AriesFrameworkError } from '../../error'
import { TransportEventTypes } from '../../transport'
import { parseMessageType } from '../../utils/messageType'
import { ConnectionInvitationMessage } from '../connections'
import { ConnectionService } from '../connections/services'
import { DiscloseMessage, DiscoverFeaturesModule } from '../discover-features'

import { MediatorPickupStrategy } from './MediatorPickupStrategy'
import { RoutingEventTypes } from './RoutingEvents'
Expand All @@ -26,6 +30,7 @@ import { MediationDenyHandler } from './handlers/MediationDenyHandler'
import { MediationGrantHandler } from './handlers/MediationGrantHandler'
import { BatchPickupMessage } from './messages/BatchPickupMessage'
import { MediationState } from './models/MediationState'
import { MediationRepository } from './repository'
import { MediationRecipientService } from './services/MediationRecipientService'

@scoped(Lifecycle.ContainerScoped)
Expand All @@ -36,21 +41,27 @@ export class RecipientModule {
private messageSender: MessageSender
private eventEmitter: EventEmitter
private logger: Logger
private discoverFeaturesModule: DiscoverFeaturesModule
private mediationRepository: MediationRepository

public constructor(
dispatcher: Dispatcher,
agentConfig: AgentConfig,
mediationRecipientService: MediationRecipientService,
connectionService: ConnectionService,
messageSender: MessageSender,
eventEmitter: EventEmitter
eventEmitter: EventEmitter,
discoverFeaturesModule: DiscoverFeaturesModule,
mediationRepository: MediationRepository
) {
this.agentConfig = agentConfig
this.connectionService = connectionService
this.mediationRecipientService = mediationRecipientService
this.messageSender = messageSender
this.eventEmitter = eventEmitter
this.logger = agentConfig.logger
this.discoverFeaturesModule = discoverFeaturesModule
this.mediationRepository = mediationRepository
this.registerHandlers(dispatcher)
}

Expand Down Expand Up @@ -153,8 +164,8 @@ export class RecipientModule {
}

public async initiateMessagePickup(mediator: MediationRecord) {
const { mediatorPickupStrategy, mediatorPollingInterval } = this.agentConfig

const { mediatorPollingInterval } = this.agentConfig
const mediatorPickupStrategy = await this.getPickupStrategyForMediator(mediator)
const mediatorConnection = await this.connectionService.getById(mediator.connectionId)

// Explicit means polling every X seconds with batch message
Expand All @@ -181,6 +192,63 @@ export class RecipientModule {
}
}

private async getPickupStrategyForMediator(mediator: MediationRecord) {
let mediatorPickupStrategy = mediator.pickupStrategy ?? this.agentConfig.mediatorPickupStrategy

// If mediator pickup strategy is not configured we try to query if batch pickup
// is supported through the discover features protocol
if (!mediatorPickupStrategy) {
const isBatchPickupSupported = await this.isBatchPickupSupportedByMediator(mediator)

// Use explicit pickup strategy
mediatorPickupStrategy = isBatchPickupSupported
? MediatorPickupStrategy.Explicit
: MediatorPickupStrategy.Implicit

// Store the result so it can be reused next time
mediator.pickupStrategy = mediatorPickupStrategy
await this.mediationRepository.update(mediator)
}

return mediatorPickupStrategy
}

private async isBatchPickupSupportedByMediator(mediator: MediationRecord) {
const { protocolUri } = parseMessageType(BatchPickupMessage.type)

// Listen for response to our feature query
const replaySubject = new ReplaySubject(1)
this.eventEmitter
.observable<AgentMessageProcessedEvent>(AgentEventTypes.AgentMessageProcessed)
.pipe(
// Stop when the agent shuts down
takeUntil(this.agentConfig.stop$),
// filter by mediator connection id and query disclose message type
filter(
(e) => e.payload.connection?.id === mediator.connectionId && e.payload.message.type === DiscloseMessage.type
),
// Return whether the protocol is supported
map((e) => {
const message = e.payload.message as DiscloseMessage
return message.protocols.map((p) => p.protocolId).includes(protocolUri)
}),
// TODO: make configurable
// If we don't have an answer in 7 seconds (no response, not supported, etc...) error
timeout(7000),
// We want to return false if an error occurred
catchError(() => of(false))
)
.subscribe(replaySubject)

await this.discoverFeaturesModule.queryFeatures(mediator.connectionId, {
query: protocolUri,
comment: 'Detect if batch pickup is supported to determine pickup strategy for messages',
})

const isBatchPickupSupported = await firstValueFrom(replaySubject)
return isBatchPickupSupported
}

public async discoverMediation() {
return this.mediationRecipientService.discoverMediation()
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/modules/routing/__tests__/mediation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { SubjectOutboundTransport } from '../../../../../../tests/transport/Subj
import { getBaseConfig, waitForBasicMessage } from '../../../../tests/helpers'
import { Agent } from '../../../agent/Agent'
import { ConnectionRecord } from '../../connections'
import { MediatorPickupStrategy } from '../MediatorPickupStrategy'
import { MediationState } from '../models/MediationState'

const recipientConfig = getBaseConfig('Mediation: Recipient')
Expand Down Expand Up @@ -93,6 +94,7 @@ describe('mediator establishment', () => {
expect(recipientMediatorConnection).toBeConnectedWith(mediatorRecipientConnection)

expect(recipientMediator?.state).toBe(MediationState.Granted)
expect(recipientMediator?.pickupStrategy).toBe(MediatorPickupStrategy.Explicit)

// Initialize sender agent
senderAgent = new Agent(senderConfig.config, senderConfig.agentDependencies)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { MediatorPickupStrategy } from '../MediatorPickupStrategy'
import type { MediationRole } from '../models/MediationRole'

import { AriesFrameworkError } from '../../../error'
Expand All @@ -15,6 +16,7 @@ export interface MediationRecordProps {
endpoint?: string
recipientKeys?: string[]
routingKeys?: string[]
pickupStrategy?: MediatorPickupStrategy
tags?: CustomMediationTags
}

Expand All @@ -40,6 +42,7 @@ export class MediationRecord
public endpoint?: string
public recipientKeys!: string[]
public routingKeys!: string[]
public pickupStrategy?: MediatorPickupStrategy

public static readonly type = 'MediationRecord'
public readonly type = MediationRecord.type
Expand All @@ -57,6 +60,8 @@ export class MediationRecord
this.state = props.state
this.role = props.role
this.endpoint = props.endpoint ?? undefined
this.pickupStrategy = props.pickupStrategy
this._tags = props.tags ?? {}
}
}

Expand Down
5 changes: 3 additions & 2 deletions packages/core/src/storage/didcomm/DidCommMessageRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import type { DidCommMessageRole } from './DidCommMessageRole'

import { AriesFrameworkError } from '../../error'
import { JsonTransformer } from '../../utils/JsonTransformer'
import { rightSplit } from '../../utils/string'
import { parseMessageType } from '../../utils/messageType'
import { isJsonObject } from '../../utils/type'
import { uuid } from '../../utils/uuid'
import { BaseRecord } from '../BaseRecord'
Expand Down Expand Up @@ -61,7 +61,8 @@ export class DidCommMessageRecord extends BaseRecord<DefaultDidCommMessageTags>
public getTags() {
const messageId = this.message['@id'] as string
const messageType = this.message['@type'] as string
const [, protocolName, protocolVersion, messageName] = rightSplit(messageType, '/', 3)

const { protocolName, protocolVersion, messageName } = parseMessageType(messageType)
const [versionMajor, versionMinor] = protocolVersion.split('.')

const thread = this.message['~thread']
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/transport/HttpOutboundTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { AbortController } from 'abort-controller'

import { AgentConfig } from '../agent/AgentConfig'
import { AriesFrameworkError } from '../error/AriesFrameworkError'
import { isValidJweStucture, JsonEncoder } from '../utils'
import { isValidJweStructure, JsonEncoder } from '../utils'

export class HttpOutboundTransport implements OutboundTransport {
private agent!: Agent
Expand Down Expand Up @@ -78,7 +78,7 @@ export class HttpOutboundTransport implements OutboundTransport {

try {
const encryptedMessage = JsonEncoder.fromString(responseMessage)
if (!isValidJweStucture(encryptedMessage)) {
if (!isValidJweStructure(encryptedMessage)) {
this.logger.error(
`Received a response from the other agent but the structure of the incoming message is not a DIDComm message: ${responseMessage}`
)
Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/transport/WsOutboundTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type WebSocket from 'ws'
import { AgentConfig } from '../agent/AgentConfig'
import { EventEmitter } from '../agent/EventEmitter'
import { AriesFrameworkError } from '../error/AriesFrameworkError'
import { isValidJweStucture, JsonEncoder } from '../utils'
import { isValidJweStructure, JsonEncoder } from '../utils'
import { Buffer } from '../utils/buffer'

import { TransportEventTypes } from './TransportEventTypes'
Expand Down Expand Up @@ -103,7 +103,7 @@ export class WsOutboundTransport implements OutboundTransport {
private handleMessageEvent = (event: any) => {
this.logger.trace('WebSocket message event received.', { url: event.target.url, data: event.data })
const payload = JsonEncoder.fromBuffer(event.data)
if (!isValidJweStucture(payload)) {
if (!isValidJweStructure(payload)) {
throw new Error(
`Received a response from the other agent but the structure of the incoming message is not a DIDComm message: ${payload}`
)
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/utils/JWE.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { EncryptedMessage } from '../types'

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export function isValidJweStucture(message: any): message is EncryptedMessage {
export function isValidJweStructure(message: any): message is EncryptedMessage {
return message && typeof message === 'object' && message.protected && message.iv && message.ciphertext && message.tag
}
6 changes: 3 additions & 3 deletions packages/core/src/utils/__tests__/JWE.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { isValidJweStucture } from '../JWE'
import { isValidJweStructure } from '../JWE'

describe('ValidJWEStructure', () => {
test('throws error when the response message has an invalid JWE structure', async () => {
const responseMessage = 'invalid JWE structure'
await expect(isValidJweStucture(responseMessage)).toBeFalsy()
await expect(isValidJweStructure(responseMessage)).toBeFalsy()
})

test('valid JWE structure', async () => {
Expand All @@ -14,6 +14,6 @@ describe('ValidJWEStructure', () => {
ciphertext: 'mwRMpVg9wkF4rIZcBeWLcc0fWhs=',
tag: '0yW0Lx8-vWevj3if91R06g==',
}
await expect(isValidJweStucture(responseMessage)).toBeTruthy()
await expect(isValidJweStructure(responseMessage)).toBeTruthy()
})
})
21 changes: 21 additions & 0 deletions packages/core/src/utils/__tests__/messageType.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
parseMessageType,
replaceLegacyDidSovPrefix,
replaceLegacyDidSovPrefixOnMessage,
replaceNewDidCommPrefixWithLegacyDidSov,
Expand Down Expand Up @@ -81,4 +82,24 @@ describe('messageType', () => {
)
})
})

describe('parseMessageType()', () => {
test('correctly parses the message type', () => {
expect(parseMessageType('https://didcomm.org/connections/1.0/request')).toEqual({
documentUri: 'https://didcomm.org',
protocolName: 'connections',
protocolVersion: '1.0',
messageName: 'request',
protocolUri: `https://didcomm.org/connections/1.0`,
})

expect(parseMessageType('https://didcomm.org/issue-credential/1.0/propose-credential')).toEqual({
documentUri: 'https://didcomm.org',
protocolName: 'issue-credential',
protocolVersion: '1.0',
messageName: 'propose-credential',
protocolUri: `https://didcomm.org/issue-credential/1.0`,
})
})
})
})
Loading

0 comments on commit 2ed57b1

Please sign in to comment.