Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make a connection with mediator asynchronously #231

Merged
merged 4 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions samples/__tests__/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { OutboundPackage, InitConfig } from '../../src/types'
import { get, post } from '../http'
import { sleep, toBeConnectedWith, waitForBasicMessage } from '../../src/__tests__/helpers'
import indy from 'indy-sdk'
import testLogger from '../../src/__tests__/logger'
import logger from '../../src/__tests__/logger'

expect.extend({ toBeConnectedWith })

Expand All @@ -13,7 +13,7 @@ const aliceConfig: InitConfig = {
walletConfig: { id: 'e2e-alice' },
walletCredentials: { key: '00000000000000000000000000000Test01' },
autoAcceptConnections: true,
logger: testLogger,
logger: logger,
indy,
}

Expand All @@ -23,7 +23,7 @@ const bobConfig: InitConfig = {
walletConfig: { id: 'e2e-bob' },
walletCredentials: { key: '00000000000000000000000000000Test02' },
autoAcceptConnections: true,
logger: testLogger,
logger: logger,
indy,
}

Expand All @@ -44,26 +44,26 @@ describe('with mediator', () => {
})

test('Alice and Bob make a connection with mediator', async () => {
const aliceAgentSender = new HttpOutboundTransporter()
const aliceAgentReceiver = new PollingInboundTransporter()
const bobAgentSender = new HttpOutboundTransporter()
const bobAgentReceiver = new PollingInboundTransporter()

aliceAgent = new Agent(aliceConfig, aliceAgentReceiver, aliceAgentSender)
aliceAgent = new Agent(aliceConfig, aliceAgentReceiver)
aliceAgent.setOutboundTransporter(new HttpOutboundTransporter(aliceAgent))
await aliceAgent.init()

bobAgent = new Agent(bobConfig, bobAgentReceiver, bobAgentSender)
bobAgent = new Agent(bobConfig, bobAgentReceiver)
bobAgent.setOutboundTransporter(new HttpOutboundTransporter(bobAgent))
await bobAgent.init()

const aliceInbound = aliceAgent.routing.getInboundConnection()
const aliceInboundConnection = aliceInbound?.connection
const aliceKeyAtAliceMediator = aliceInboundConnection?.verkey
testLogger.test('aliceInboundConnection', aliceInboundConnection)
logger.test('aliceInboundConnection', aliceInboundConnection)

const bobInbound = bobAgent.routing.getInboundConnection()
const bobInboundConnection = bobInbound?.connection
const bobKeyAtBobMediator = bobInboundConnection?.verkey
testLogger.test('bobInboundConnection', bobInboundConnection)
logger.test('bobInboundConnection', bobInboundConnection)

// TODO This endpoint currently exists at mediator only for the testing purpose. It returns mediator's part of the pairwise connection.
const mediatorConnectionAtAliceMediator = JSON.parse(
Expand All @@ -73,8 +73,8 @@ describe('with mediator', () => {
await get(`${bobAgent.getMediatorUrl()}/api/connections/${bobKeyAtBobMediator}`)
)

testLogger.test('mediatorConnectionAtAliceMediator', mediatorConnectionAtAliceMediator)
testLogger.test('mediatorConnectionAtBobMediator', mediatorConnectionAtBobMediator)
logger.test('mediatorConnectionAtAliceMediator', mediatorConnectionAtAliceMediator)
logger.test('mediatorConnectionAtBobMediator', mediatorConnectionAtBobMediator)

expect(aliceInboundConnection).toBeConnectedWith(mediatorConnectionAtAliceMediator)
expect(bobInboundConnection).toBeConnectedWith(mediatorConnectionAtBobMediator)
Expand Down Expand Up @@ -104,7 +104,7 @@ describe('with mediator', () => {
throw new Error(`There is no connection for id ${aliceAtAliceBobId}`)
}

testLogger.test('aliceConnectionAtAliceBob\n', aliceConnectionAtAliceBob)
logger.test('aliceConnectionAtAliceBob\n', aliceConnectionAtAliceBob)

const message = 'hello, world'
await aliceAgent.basicMessages.sendMessage(aliceConnectionAtAliceBob, message)
Expand Down Expand Up @@ -143,7 +143,7 @@ class PollingInboundTransporter implements InboundTransporter {
while (!this.stop) {
const downloadedMessages = await agent.routing.downloadMessages()
const messages = [...downloadedMessages]
testLogger.test('downloaded messages', messages)
logger.test('downloaded messages', messages)
while (messages && messages.length > 0) {
const message = messages.shift()
await agent.receiveMessage(message)
Expand All @@ -159,22 +159,42 @@ class PollingInboundTransporter implements InboundTransporter {
}

class HttpOutboundTransporter implements OutboundTransporter {
private agent: Agent

public constructor(agent: Agent) {
this.agent = agent
}
public async sendMessage(outboundPackage: OutboundPackage, receiveReply: boolean) {
const { payload, endpoint } = outboundPackage

if (!endpoint) {
throw new Error(`Missing endpoint. I don't know how and where to send the message.`)
}

testLogger.test(`Sending outbound message to connection ${outboundPackage.connection.id}`, outboundPackage.payload)
logger.test(`Sending outbound message to connection ${outboundPackage.connection.id}`, outboundPackage.payload)

if (receiveReply) {
const response = await post(`${endpoint}`, JSON.stringify(payload))
logger.test(`received response:\n ${response}`)
const wireMessage = JSON.parse(response)
testLogger.test('received response', wireMessage)
return wireMessage
this.agent.receiveMessage(wireMessage)
TimoGlastra marked this conversation as resolved.
Show resolved Hide resolved
} else {
await post(`${endpoint}`, JSON.stringify(payload))
}
}

public async sendAndReceiveMessage(outboundPackage: OutboundPackage) {
const { payload, endpoint } = outboundPackage

if (!endpoint) {
throw new Error(`Missing endpoint. I don't know how and where to send the message.`)
}

logger.test(`Sending outbound message to connection ${outboundPackage.connection.id}`, outboundPackage.payload)

const response = await post(`${endpoint}`, JSON.stringify(payload))
const wireMessage = JSON.parse(response)
logger.test('received response', wireMessage)
return wireMessage
}
}
7 changes: 6 additions & 1 deletion samples/mediator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class StorageOutboundTransporter implements OutboundTransporter {

this.messageRepository.save(connection.theirKey, payload)
}

public sendAndReceiveMessage(outboundPackage: OutboundPackage): Promise<any> {
throw new Error('Method not implemented.')
}
}

const PORT = config.port
Expand All @@ -67,7 +71,8 @@ app.set('json spaces', 2)
const messageRepository = new InMemoryMessageRepository()
const messageSender = new StorageOutboundTransporter(messageRepository)
const messageReceiver = new HttpInboundTransporter(app)
const agent = new Agent(config, messageReceiver, messageSender, messageRepository)
const agent = new Agent(config, messageReceiver, messageRepository)
agent.setOutboundTransporter(messageSender)

app.get('/', async (req, res) => {
const agentDid = agent.publicDid
Expand Down
12 changes: 6 additions & 6 deletions src/__tests__/agents.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ describe('agents', () => {
const aliceMessages = new Subject()
const bobMessages = new Subject()

const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages)
const aliceAgentOutbound = new SubjectOutboundTransporter(bobMessages)
const bobAgentInbound = new SubjectInboundTransporter(bobMessages)
const bobAgentOutbound = new SubjectOutboundTransporter(aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages, bobMessages)
const bobAgentInbound = new SubjectInboundTransporter(bobMessages, aliceMessages)

aliceAgent = new Agent(aliceConfig, aliceAgentInbound, aliceAgentOutbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound)
aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(bobMessages))
await aliceAgent.init()

bobAgent = new Agent(bobConfig, bobAgentInbound, bobAgentOutbound)
bobAgent = new Agent(bobConfig, bobAgentInbound)
bobAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages))
await bobAgent.init()

const aliceConnectionAtAliceBob = await aliceAgent.connections.createConnection()
Expand Down
12 changes: 6 additions & 6 deletions src/__tests__/credentials.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ describe('credentials', () => {
const faberMessages = new Subject()
const aliceMessages = new Subject()

const faberAgentInbound = new SubjectInboundTransporter(faberMessages)
const faberAgentOutbound = new SubjectOutboundTransporter(aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages)
const aliceAgentOutbound = new SubjectOutboundTransporter(faberMessages)
const faberAgentInbound = new SubjectInboundTransporter(faberMessages, aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages, faberMessages)

faberAgent = new Agent(faberConfig, faberAgentInbound, faberAgentOutbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound, aliceAgentOutbound)
faberAgent = new Agent(faberConfig, faberAgentInbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound)
faberAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages))
aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(faberMessages))
await faberAgent.init()
await aliceAgent.init()

Expand Down
21 changes: 16 additions & 5 deletions src/__tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,25 @@ export async function waitForBasicMessage(

export class SubjectInboundTransporter implements InboundTransporter {
private subject: Subject<WireMessage>
private theirSubject: Subject<WireMessage>

public constructor(subject: Subject<WireMessage>) {
public constructor(subject: Subject<WireMessage>, theirSubject: Subject<WireMessage>) {
this.subject = subject
this.theirSubject = theirSubject
}

public start(agent: Agent) {
this.subscribe(agent, this.subject)
this.subscribe(agent)
}

private subscribe(agent: Agent, subject: Subject<WireMessage>) {
subject.subscribe({
next: (message: WireMessage) => agent.receiveMessage(message),
private subscribe(agent: Agent) {
this.subject.subscribe({
next: async (message: WireMessage) => {
const outboundMessage = await agent.receiveMessage(message)
if (outboundMessage) {
this.theirSubject.next(outboundMessage.payload)
}
},
})
}
}
Expand All @@ -150,6 +157,10 @@ export class SubjectOutboundTransporter implements OutboundTransporter {
const { payload } = outboundPackage
this.subject.next(payload)
}

public sendAndReceiveMessage(outboundPackage: OutboundPackage): Promise<any> {
throw new Error('Method not implemented.')
}
}

export async function makeConnection(agentA: Agent, agentB: Agent) {
Expand Down
10 changes: 2 additions & 8 deletions src/__tests__/ledger.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import indy from 'indy-sdk'
import type { SchemaId } from 'indy-sdk'
import { Agent, InboundTransporter, OutboundTransporter } from '..'
import { Agent, InboundTransporter } from '..'
import { DID_IDENTIFIER_REGEX, VERKEY_REGEX, isFullVerkey, isAbbreviatedVerkey } from '../utils/did'
import { genesisPath, sleep } from './helpers'
import { InitConfig } from '../types'
Expand All @@ -22,7 +22,7 @@ describe('ledger', () => {
let schemaId: SchemaId

beforeAll(async () => {
faberAgent = new Agent(faberConfig, new DummyInboundTransporter(), new DummyOutboundTransporter())
faberAgent = new Agent(faberConfig, new DummyInboundTransporter())
await faberAgent.init()
})

Expand Down Expand Up @@ -140,9 +140,3 @@ class DummyInboundTransporter implements InboundTransporter {
testLogger.test('Starting agent...')
}
}

class DummyOutboundTransporter implements OutboundTransporter {
public async sendMessage() {
testLogger.test('Sending message...')
}
}
12 changes: 6 additions & 6 deletions src/__tests__/proofs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ describe('Present Proof', () => {
const faberMessages = new Subject()
const aliceMessages = new Subject()

const faberAgentInbound = new SubjectInboundTransporter(faberMessages)
const faberAgentOutbound = new SubjectOutboundTransporter(aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages)
const aliceAgentOutbound = new SubjectOutboundTransporter(faberMessages)
const faberAgentInbound = new SubjectInboundTransporter(faberMessages, aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages, faberMessages)

faberAgent = new Agent(faberConfig, faberAgentInbound, faberAgentOutbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound, aliceAgentOutbound)
faberAgent = new Agent(faberConfig, faberAgentInbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound)
faberAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages))
aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(faberMessages))
await faberAgent.init()
await aliceAgent.init()

Expand Down
7 changes: 5 additions & 2 deletions src/agent/Agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ export class Agent {
public constructor(
initialConfig: InitConfig,
inboundTransporter: InboundTransporter,
outboundTransporter: OutboundTransporter,
messageRepository?: MessageRepository
) {
this.agentConfig = new AgentConfig(initialConfig)
Expand All @@ -82,7 +81,7 @@ export class Agent {
this.wallet = new IndyWallet(this.agentConfig)
const envelopeService = new EnvelopeService(this.wallet, this.agentConfig)

this.messageSender = new MessageSender(envelopeService, outboundTransporter)
this.messageSender = new MessageSender(envelopeService)
this.dispatcher = new Dispatcher(this.messageSender)
this.inboundTransporter = inboundTransporter

Expand Down Expand Up @@ -119,6 +118,10 @@ export class Agent {
this.registerModules()
}

public setOutboundTransporter(outboundTransporter: OutboundTransporter) {
this.messageSender.setOutboundTransporter(outboundTransporter)
}

public async init() {
await this.wallet.init()

Expand Down
7 changes: 5 additions & 2 deletions src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Handler } from './Handler'
import { MessageSender } from './MessageSender'
import { AgentMessage } from './AgentMessage'
import { InboundMessageContext } from './models/InboundMessageContext'
import { ReturnRouteTypes } from '../decorators/transport/TransportDecorator'

class Dispatcher {
private handlers: Handler[] = []
Expand All @@ -29,15 +30,17 @@ class Dispatcher {
if (outboundMessage) {
const threadId = outboundMessage.payload.threadId

if (!outboundMessage.connection.hasInboundEndpoint()) {
outboundMessage.payload.setReturnRouting(ReturnRouteTypes.all)
}

// check for return routing, with thread id
if (message.hasReturnRouting(threadId)) {
return await this.messageSender.packMessage(outboundMessage)
}

await this.messageSender.sendMessage(outboundMessage)
}

return outboundMessage || undefined
}

private getHandlerForType(messageType: string): Handler | undefined {
Expand Down
19 changes: 15 additions & 4 deletions src/agent/MessageSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import { JsonTransformer } from '../utils/JsonTransformer'

class MessageSender {
private envelopeService: EnvelopeService
private outboundTransporter: OutboundTransporter
private outboundTransporter?: OutboundTransporter

public constructor(envelopeService: EnvelopeService, outboundTransporter: OutboundTransporter) {
public constructor(envelopeService: EnvelopeService) {
this.envelopeService = envelopeService
}

public setOutboundTransporter(outboundTransporter: OutboundTransporter) {
this.outboundTransporter = outboundTransporter
}

Expand All @@ -21,18 +24,26 @@ class MessageSender {
}

public async sendMessage(outboundMessage: OutboundMessage): Promise<void> {
if (!this.outboundTransporter) {
throw new Error('Agent has no outbound transporter!')
}
const returnRoute = outboundMessage.payload.hasReturnRouting()
const outboundPackage = await this.envelopeService.packMessage(outboundMessage)
await this.outboundTransporter.sendMessage(outboundPackage, false)
await this.outboundTransporter.sendMessage(outboundPackage, returnRoute)
}

public async sendAndReceiveMessage<T extends AgentMessage>(
outboundMessage: OutboundMessage,
ReceivedMessageClass: Constructor<T>
): Promise<InboundMessageContext<T>> {
if (!this.outboundTransporter) {
throw new Error('Agent has no outbound transporter!')
}

outboundMessage.payload.setReturnRouting(ReturnRouteTypes.all)

const outboundPackage = await this.envelopeService.packMessage(outboundMessage)
const inboundPackedMessage = await this.outboundTransporter.sendMessage(outboundPackage, true)
const inboundPackedMessage = await this.outboundTransporter.sendAndReceiveMessage(outboundPackage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not fully understanding. Why did you add a sendAndReceiveMessage to the outbound transporter?

Copy link
Contributor Author

@jakubkoci jakubkoci Apr 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I wanted to preserve the current behavior of await agent.routing.downloadMessages().

First, I thought that using sendMessage(outboundPackage, true) would be good enough. The problem is that in HTTP outbound we now pass the response immediately to the agent calling this.agent.receiveMessage(wireMessage). It means that the batch response is not processed inside downloadMessages anymore.

This is related to the problem we discussed in our last AFJ call about dispatching messages to the agent from a message handler. I listed as last point in the description:
"Update download messages functionality to process batch message and to remove sendAndReceiveMessage that is still implemented in the old way relaying on synchronicity of HTTP request-response."

I'm thinking if we could implement the solution as part of this PR. I usually tend to work in smaller batches. Maybe prioritize it at least and do it before WebSockets 🤷‍♂️ Maybe it's not so big change 🤔

const inboundUnpackedMessage = await this.envelopeService.unpackMessage(inboundPackedMessage)

const message = JsonTransformer.fromJSON(inboundUnpackedMessage.message, ReceivedMessageClass)
Expand Down
Loading