Skip to content

Commit

Permalink
feat!: initial live mode support + message repository refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Ariel Gentile <gentilester@gmail.com>
  • Loading branch information
genaris committed Jan 29, 2024
1 parent a641a96 commit 7e6970d
Show file tree
Hide file tree
Showing 46 changed files with 924 additions and 278 deletions.
6 changes: 2 additions & 4 deletions packages/core/src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import { JwsService } from '../crypto/JwsService'
import { AriesFrameworkError } from '../error'
import { DependencyManager } from '../plugins'
import { DidCommMessageRepository, StorageUpdateService, StorageVersionRepository } from '../storage'
import { InMemoryMessageRepository } from '../storage/InMemoryMessageRepository'

import { AgentConfig } from './AgentConfig'
import { extendModulesWithDefaultModules } from './AgentModules'
Expand Down Expand Up @@ -90,9 +89,6 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
"Missing required dependency: 'StorageService'. You can register it using one of the provided modules such as the AskarModule or the IndySdkModule, or implement your own."
)
}
if (!dependencyManager.isRegistered(InjectionSymbols.MessageRepository)) {
dependencyManager.registerSingleton(InjectionSymbols.MessageRepository, InMemoryMessageRepository)
}

// TODO: contextCorrelationId for base wallet
// Bind the default agent context to the container for use in modules etc.
Expand Down Expand Up @@ -197,6 +193,8 @@ export class Agent<AgentModules extends AgentModulesInput = any> extends BaseAge
)
await this.mediationRecipient.provision(mediationConnection)
}

await this.messagePickup.initialize()
await this.mediator.initialize()
await this.mediationRecipient.initialize()

Expand Down
22 changes: 14 additions & 8 deletions packages/core/src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ import type { TransportSession } from './TransportService'
import type { AgentContext } from './context'
import type { ConnectionRecord } from '../modules/connections'
import type { ResolvedDidCommService } from '../modules/didcomm'
import type { DidDocument } from '../modules/dids'
import type { OutOfBandRecord } from '../modules/oob/repository'
import type { OutboundTransport } from '../transport/OutboundTransport'
import type { OutboundPackage, EncryptedMessage } from '../types'
import type { EncryptedMessage, OutboundPackage } from '../types'

import { DID_COMM_TRANSPORT_QUEUE, InjectionSymbols } from '../constants'
import { ReturnRouteTypes } from '../decorators/transport/TransportDecorator'
import { AriesFrameworkError, MessageSendingError } from '../error'
import { Logger } from '../logger'
import { DidCommDocumentService } from '../modules/didcomm'
import { DidKey, type DidDocument } from '../modules/dids'
import { getKeyFromVerificationMethod } from '../modules/dids/domain/key-type'
import { didKeyToInstanceOfKey } from '../modules/dids/helpers'
import { DidResolverService } from '../modules/dids/services/DidResolverService'
import { MessagePickupRepository } from '../modules/message-pìckup/storage'
import { inject, injectable } from '../plugins'
import { MessageRepository } from '../storage/MessageRepository'
import { MessageValidator } from '../utils/MessageValidator'
import { getProtocolScheme } from '../utils/uri'

Expand All @@ -38,7 +38,7 @@ export interface TransportPriorityOptions {
export class MessageSender {
private envelopeService: EnvelopeService
private transportService: TransportService
private messageRepository: MessageRepository
private messagePickupRepository: MessagePickupRepository
private logger: Logger
private didResolverService: DidResolverService
private didCommDocumentService: DidCommDocumentService
Expand All @@ -48,15 +48,15 @@ export class MessageSender {
public constructor(
envelopeService: EnvelopeService,
transportService: TransportService,
@inject(InjectionSymbols.MessageRepository) messageRepository: MessageRepository,
@inject(InjectionSymbols.MessagePickupRepository) messagePickupRepository: MessagePickupRepository,
@inject(InjectionSymbols.Logger) logger: Logger,
didResolverService: DidResolverService,
didCommDocumentService: DidCommDocumentService,
eventEmitter: EventEmitter
) {
this.envelopeService = envelopeService
this.transportService = transportService
this.messageRepository = messageRepository
this.messagePickupRepository = messagePickupRepository
this.logger = logger
this.didResolverService = didResolverService
this.didCommDocumentService = didCommDocumentService
Expand Down Expand Up @@ -176,7 +176,7 @@ export class MessageSender {
// If the other party shared a queue service endpoint in their did doc we queue the message
if (queueService) {
this.logger.debug(`Queue packed message for connection ${connection.id} (${connection.theirLabel})`)
await this.messageRepository.add(connection.id, encryptedMessage)
await this.messagePickupRepository.addMessage({ connectionId: connection.id, payload: encryptedMessage })
return
}

Expand Down Expand Up @@ -327,7 +327,13 @@ export class MessageSender {
}

const encryptedMessage = await this.envelopeService.packMessage(agentContext, message, keys)
await this.messageRepository.add(connection.id, encryptedMessage)
for (const recipientKey of keys.recipientKeys) {
await this.messagePickupRepository.addMessage({
connectionId: connection.id,
recipientKey: new DidKey(recipientKey).did,
payload: encryptedMessage,
})
}

this.emitMessageSentEvent(outboundMessageContext, OutboundMessageSendStatus.QueuedForPickup)

Expand Down
26 changes: 25 additions & 1 deletion packages/core/src/agent/TransportService.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
import type { AgentMessage } from './AgentMessage'
import type { EnvelopeKeys } from './EnvelopeService'
import type { AgentContext } from './context'
import type { DidDocument } from '../modules/dids'
import type { TransportSessionRemovedEvent, TransportSessionSavedEvent } from '../transport'
import type { EncryptedMessage } from '../types'

import { DID_COMM_TRANSPORT_QUEUE } from '../constants'
import { AriesFrameworkError } from '../error'
import { injectable } from '../plugins'
import { TransportEventTypes } from '../transport'

import { EventEmitter } from './EventEmitter'
import { AgentContext } from './context'

@injectable()
export class TransportService {
public transportSessionTable: TransportSessionTable = {}
private agentContext: AgentContext
private eventEmitter: EventEmitter

public constructor(agentContext: AgentContext, eventEmitter: EventEmitter) {
this.agentContext = agentContext
this.eventEmitter = eventEmitter
}

public saveSession(session: TransportSession) {
if (session.connectionId) {
Expand All @@ -22,6 +33,13 @@ export class TransportService {
})
}
this.transportSessionTable[session.id] = session

this.eventEmitter.emit<TransportSessionSavedEvent>(this.agentContext, {
type: TransportEventTypes.TransportSessionSaved,
payload: {
session,
},
})
}

public findSessionByConnectionId(connectionId: string) {
Expand All @@ -47,6 +65,12 @@ export class TransportService {

public removeSession(session: TransportSession) {
delete this.transportSessionTable[session.id]
this.eventEmitter.emit<TransportSessionRemovedEvent>(this.agentContext, {
type: TransportEventTypes.TransportSessionRemoved,
payload: {
session,
},
})
}

private getExistingSessionsForConnectionIdAndType(connectionId: string, type: string) {
Expand Down
11 changes: 6 additions & 5 deletions packages/core/src/agent/__tests__/Agent.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { ConnectionService } from '../../modules/connections/services/Connection
import { TrustPingService } from '../../modules/connections/services/TrustPingService'
import { CredentialRepository } from '../../modules/credentials'
import { CredentialsApi } from '../../modules/credentials/CredentialsApi'
import { MessagePickupApi } from '../../modules/message-pìckup'
import { MessagePickupApi, InMemoryMessagePickupRepository } from '../../modules/message-pìckup'
import { ProofRepository } from '../../modules/proofs'
import { ProofsApi } from '../../modules/proofs/ProofsApi'
import {
Expand All @@ -24,7 +24,6 @@ import {
MediationRecipientApi,
MediationRecipientModule,
} from '../../modules/routing'
import { InMemoryMessageRepository } from '../../storage/InMemoryMessageRepository'
import { WalletError } from '../../wallet/error'
import { Agent } from '../Agent'
import { Dispatcher } from '../Dispatcher'
Expand Down Expand Up @@ -179,7 +178,9 @@ describe('Agent', () => {

// Symbols, interface based
expect(container.resolve(InjectionSymbols.Logger)).toBe(agentOptions.config.logger)
expect(container.resolve(InjectionSymbols.MessageRepository)).toBeInstanceOf(InMemoryMessageRepository)
expect(container.resolve(InjectionSymbols.MessagePickupRepository)).toBeInstanceOf(
InMemoryMessagePickupRepository
)

// Agent
expect(container.resolve(MessageSender)).toBeInstanceOf(MessageSender)
Expand Down Expand Up @@ -217,8 +218,8 @@ describe('Agent', () => {

// Symbols, interface based
expect(container.resolve(InjectionSymbols.Logger)).toBe(container.resolve(InjectionSymbols.Logger))
expect(container.resolve(InjectionSymbols.MessageRepository)).toBe(
container.resolve(InjectionSymbols.MessageRepository)
expect(container.resolve(InjectionSymbols.MessagePickupRepository)).toBe(
container.resolve(InjectionSymbols.MessagePickupRepository)
)
expect(container.resolve(InjectionSymbols.StorageService)).toBe(
container.resolve(InjectionSymbols.StorageService)
Expand Down
18 changes: 9 additions & 9 deletions packages/core/src/agent/__tests__/MessageSender.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import type { ConnectionRecord } from '../../modules/connections'
import type { ResolvedDidCommService } from '../../modules/didcomm'
import type { DidDocumentService } from '../../modules/dids'
import type { MessageRepository } from '../../storage/MessageRepository'
import type { MessagePickupRepository } from '../../modules/message-pìckup/storage'
import type { OutboundTransport } from '../../transport'
import type { EncryptedMessage } from '../../types'
import type { AgentMessageSentEvent } from '../Events'
Expand All @@ -24,7 +24,7 @@ import { DidCommDocumentService } from '../../modules/didcomm'
import { DidResolverService, DidDocument, VerificationMethod } from '../../modules/dids'
import { DidCommV1Service } from '../../modules/dids/domain/service/DidCommV1Service'
import { verkeyToInstanceOfKey } from '../../modules/dids/helpers'
import { InMemoryMessageRepository } from '../../storage/InMemoryMessageRepository'
import { InMemoryMessagePickupRepository } from '../../modules/message-pìckup/storage'
import { EnvelopeService as EnvelopeServiceImpl } from '../EnvelopeService'
import { EventEmitter } from '../EventEmitter'
import { AgentEventTypes } from '../Events'
Expand Down Expand Up @@ -114,7 +114,7 @@ describe('MessageSender', () => {
sessionWithoutKeys.inboundMessage = inboundMessage
sessionWithoutKeys.send = jest.fn()

const transportService = new TransportService()
const transportService = new TransportService(getAgentContext(), eventEmitter)
const transportServiceFindSessionMock = mockFunction(transportService.findSessionByConnectionId)
const transportServiceFindSessionByIdMock = mockFunction(transportService.findSessionById)
const transportServiceHasInboundEndpoint = mockFunction(transportService.hasInboundEndpoint)
Expand All @@ -132,7 +132,7 @@ describe('MessageSender', () => {

let messageSender: MessageSender
let outboundTransport: OutboundTransport
let messageRepository: MessageRepository
let messagePickupRepository: MessagePickupRepository
let connection: ConnectionRecord
let outboundMessageContext: OutboundMessageContext
const agentConfig = getAgentConfig('MessageSender')
Expand All @@ -147,11 +147,11 @@ describe('MessageSender', () => {
eventEmitter.on<AgentMessageSentEvent>(AgentEventTypes.AgentMessageSent, eventListenerMock)

outboundTransport = new DummyHttpOutboundTransport()
messageRepository = new InMemoryMessageRepository(agentConfig.logger)
messagePickupRepository = new InMemoryMessagePickupRepository(agentConfig.logger)
messageSender = new MessageSender(
enveloperService,
transportService,
messageRepository,
messagePickupRepository,
logger,
didResolverService,
didCommDocumentService,
Expand Down Expand Up @@ -496,7 +496,7 @@ describe('MessageSender', () => {
messageSender = new MessageSender(
enveloperService,
transportService,
new InMemoryMessageRepository(agentConfig.logger),
new InMemoryMessagePickupRepository(agentConfig.logger),
logger,
didResolverService,
didCommDocumentService,
Expand Down Expand Up @@ -635,11 +635,11 @@ describe('MessageSender', () => {
describe('packMessage', () => {
beforeEach(() => {
outboundTransport = new DummyHttpOutboundTransport()
messageRepository = new InMemoryMessageRepository(agentConfig.logger)
messagePickupRepository = new InMemoryMessagePickupRepository(agentConfig.logger)
messageSender = new MessageSender(
enveloperService,
transportService,
messageRepository,
messagePickupRepository,
logger,
didResolverService,
didCommDocumentService,
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/agent/__tests__/TransportService.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import { getMockConnection } from '../../../tests/helpers'
import { Subject } from 'rxjs'

import { agentDependencies, getAgentContext, getMockConnection } from '../../../tests/helpers'
import { DidExchangeRole } from '../../modules/connections'
import { EventEmitter } from '../EventEmitter'
import { TransportService } from '../TransportService'

import { DummyTransportSession } from './stubs'
Expand All @@ -9,7 +12,7 @@ describe('TransportService', () => {
let transportService: TransportService

beforeEach(() => {
transportService = new TransportService()
transportService = new TransportService(getAgentContext(), new EventEmitter(agentDependencies, new Subject()))
})

test(`remove session saved for a given connection`, () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/constants.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
export const InjectionSymbols = {
MessageRepository: Symbol('MessageRepository'),
MessagePickupRepository: Symbol('MessagePickupRepository'),
StorageService: Symbol('StorageService'),
Logger: Symbol('Logger'),
AgentContextProvider: Symbol('AgentContextProvider'),
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export { DidCommMimeType, KeyDerivationMethod } from './types'
export type { FileSystem, DownloadToFileOptions } from './storage/FileSystem'
export * from './storage/BaseRecord'
export { DidCommMessageRecord, DidCommMessageRole, DidCommMessageRepository } from './storage/didcomm'
export { InMemoryMessageRepository } from './storage/InMemoryMessageRepository'
export { InMemoryMessagePickupRepository } from './modules/message-pìckup'
export { Repository } from './storage/Repository'
export * from './storage/RepositoryEvents'
export { StorageService, Query, SimpleQuery, BaseRecordConstructor } from './storage/StorageService'
Expand Down
Loading

0 comments on commit 7e6970d

Please sign in to comment.