Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make a connection with mediator asynchronously #231

Merged
merged 4 commits into from
Apr 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 26 additions & 24 deletions samples/__tests__/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { OutboundPackage, InitConfig } from '../../src/types'
import { get, post } from '../http'
import { sleep, toBeConnectedWith, waitForBasicMessage } from '../../src/__tests__/helpers'
import indy from 'indy-sdk'
import testLogger from '../../src/__tests__/logger'
import logger from '../../src/__tests__/logger'

expect.extend({ toBeConnectedWith })

Expand All @@ -13,7 +13,7 @@ const aliceConfig: InitConfig = {
walletConfig: { id: 'e2e-alice' },
walletCredentials: { key: '00000000000000000000000000000Test01' },
autoAcceptConnections: true,
logger: testLogger,
logger: logger,
indy,
}

Expand All @@ -23,7 +23,7 @@ const bobConfig: InitConfig = {
walletConfig: { id: 'e2e-bob' },
walletCredentials: { key: '00000000000000000000000000000Test02' },
autoAcceptConnections: true,
logger: testLogger,
logger: logger,
indy,
}

Expand All @@ -44,26 +44,26 @@ describe('with mediator', () => {
})

test('Alice and Bob make a connection with mediator', async () => {
const aliceAgentSender = new HttpOutboundTransporter()
const aliceAgentReceiver = new PollingInboundTransporter()
const bobAgentSender = new HttpOutboundTransporter()
const bobAgentReceiver = new PollingInboundTransporter()

aliceAgent = new Agent(aliceConfig, aliceAgentReceiver, aliceAgentSender)
aliceAgent = new Agent(aliceConfig, aliceAgentReceiver)
aliceAgent.setOutboundTransporter(new HttpOutboundTransporter(aliceAgent))
await aliceAgent.init()

bobAgent = new Agent(bobConfig, bobAgentReceiver, bobAgentSender)
bobAgent = new Agent(bobConfig, bobAgentReceiver)
bobAgent.setOutboundTransporter(new HttpOutboundTransporter(bobAgent))
await bobAgent.init()

const aliceInbound = aliceAgent.routing.getInboundConnection()
const aliceInboundConnection = aliceInbound?.connection
const aliceKeyAtAliceMediator = aliceInboundConnection?.verkey
testLogger.test('aliceInboundConnection', aliceInboundConnection)
logger.test('aliceInboundConnection', aliceInboundConnection)

const bobInbound = bobAgent.routing.getInboundConnection()
const bobInboundConnection = bobInbound?.connection
const bobKeyAtBobMediator = bobInboundConnection?.verkey
testLogger.test('bobInboundConnection', bobInboundConnection)
logger.test('bobInboundConnection', bobInboundConnection)

// TODO This endpoint currently exists at mediator only for the testing purpose. It returns mediator's part of the pairwise connection.
const mediatorConnectionAtAliceMediator = JSON.parse(
Expand All @@ -73,8 +73,8 @@ describe('with mediator', () => {
await get(`${bobAgent.getMediatorUrl()}/api/connections/${bobKeyAtBobMediator}`)
)

testLogger.test('mediatorConnectionAtAliceMediator', mediatorConnectionAtAliceMediator)
testLogger.test('mediatorConnectionAtBobMediator', mediatorConnectionAtBobMediator)
logger.test('mediatorConnectionAtAliceMediator', mediatorConnectionAtAliceMediator)
logger.test('mediatorConnectionAtBobMediator', mediatorConnectionAtBobMediator)

expect(aliceInboundConnection).toBeConnectedWith(mediatorConnectionAtAliceMediator)
expect(bobInboundConnection).toBeConnectedWith(mediatorConnectionAtBobMediator)
Expand Down Expand Up @@ -104,7 +104,7 @@ describe('with mediator', () => {
throw new Error(`There is no connection for id ${aliceAtAliceBobId}`)
}

testLogger.test('aliceConnectionAtAliceBob\n', aliceConnectionAtAliceBob)
logger.test('aliceConnectionAtAliceBob\n', aliceConnectionAtAliceBob)

const message = 'hello, world'
await aliceAgent.basicMessages.sendMessage(aliceConnectionAtAliceBob, message)
Expand Down Expand Up @@ -141,14 +141,7 @@ class PollingInboundTransporter implements InboundTransporter {
private pollDownloadMessages(agent: Agent) {
const loop = async () => {
while (!this.stop) {
const downloadedMessages = await agent.routing.downloadMessages()
const messages = [...downloadedMessages]
testLogger.test('downloaded messages', messages)
while (messages && messages.length > 0) {
const message = messages.shift()
await agent.receiveMessage(message)
}

await agent.routing.downloadMessages()
await sleep(1000)
}
}
Expand All @@ -159,20 +152,29 @@ class PollingInboundTransporter implements InboundTransporter {
}

class HttpOutboundTransporter implements OutboundTransporter {
private agent: Agent

public constructor(agent: Agent) {
this.agent = agent
}
public async sendMessage(outboundPackage: OutboundPackage, receiveReply: boolean) {
const { payload, endpoint } = outboundPackage

if (!endpoint) {
throw new Error(`Missing endpoint. I don't know how and where to send the message.`)
}

testLogger.test(`Sending outbound message to connection ${outboundPackage.connection.id}`, outboundPackage.payload)
logger.debug(`Sending outbound message to connection ${outboundPackage.connection.id}`, outboundPackage.payload)

if (receiveReply) {
const response = await post(`${endpoint}`, JSON.stringify(payload))
const wireMessage = JSON.parse(response)
testLogger.test('received response', wireMessage)
return wireMessage
if (response) {
logger.debug(`Response received:\n ${response}`)
const wireMessage = JSON.parse(response)
this.agent.receiveMessage(wireMessage)
} else {
logger.debug(`No response received.`)
}
} else {
await post(`${endpoint}`, JSON.stringify(payload))
}
Expand Down
3 changes: 2 additions & 1 deletion samples/mediator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ app.set('json spaces', 2)
const messageRepository = new InMemoryMessageRepository()
const messageSender = new StorageOutboundTransporter(messageRepository)
const messageReceiver = new HttpInboundTransporter(app)
const agent = new Agent(config, messageReceiver, messageSender, messageRepository)
const agent = new Agent(config, messageReceiver, messageRepository)
agent.setOutboundTransporter(messageSender)

app.get('/', async (req, res) => {
const agentDid = agent.publicDid
Expand Down
12 changes: 6 additions & 6 deletions src/__tests__/agents.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ describe('agents', () => {
const aliceMessages = new Subject()
const bobMessages = new Subject()

const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages)
const aliceAgentOutbound = new SubjectOutboundTransporter(bobMessages)
const bobAgentInbound = new SubjectInboundTransporter(bobMessages)
const bobAgentOutbound = new SubjectOutboundTransporter(aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages, bobMessages)
const bobAgentInbound = new SubjectInboundTransporter(bobMessages, aliceMessages)

aliceAgent = new Agent(aliceConfig, aliceAgentInbound, aliceAgentOutbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound)
aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(bobMessages))
await aliceAgent.init()

bobAgent = new Agent(bobConfig, bobAgentInbound, bobAgentOutbound)
bobAgent = new Agent(bobConfig, bobAgentInbound)
bobAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages))
await bobAgent.init()

const aliceConnectionAtAliceBob = await aliceAgent.connections.createConnection()
Expand Down
12 changes: 6 additions & 6 deletions src/__tests__/credentials.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,13 @@ describe('credentials', () => {
const faberMessages = new Subject()
const aliceMessages = new Subject()

const faberAgentInbound = new SubjectInboundTransporter(faberMessages)
const faberAgentOutbound = new SubjectOutboundTransporter(aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages)
const aliceAgentOutbound = new SubjectOutboundTransporter(faberMessages)
const faberAgentInbound = new SubjectInboundTransporter(faberMessages, aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages, faberMessages)

faberAgent = new Agent(faberConfig, faberAgentInbound, faberAgentOutbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound, aliceAgentOutbound)
faberAgent = new Agent(faberConfig, faberAgentInbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound)
faberAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages))
aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(faberMessages))
await faberAgent.init()
await aliceAgent.init()

Expand Down
17 changes: 12 additions & 5 deletions src/__tests__/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,25 @@ export async function waitForBasicMessage(

export class SubjectInboundTransporter implements InboundTransporter {
private subject: Subject<WireMessage>
private theirSubject: Subject<WireMessage>

public constructor(subject: Subject<WireMessage>) {
public constructor(subject: Subject<WireMessage>, theirSubject: Subject<WireMessage>) {
this.subject = subject
this.theirSubject = theirSubject
}

public start(agent: Agent) {
this.subscribe(agent, this.subject)
this.subscribe(agent)
}

private subscribe(agent: Agent, subject: Subject<WireMessage>) {
subject.subscribe({
next: (message: WireMessage) => agent.receiveMessage(message),
private subscribe(agent: Agent) {
this.subject.subscribe({
next: async (message: WireMessage) => {
const outboundMessage = await agent.receiveMessage(message)
if (outboundMessage) {
this.theirSubject.next(outboundMessage.payload)
}
},
})
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/__tests__/ledger.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import indy from 'indy-sdk'
import type { SchemaId } from 'indy-sdk'
import { Agent, InboundTransporter, OutboundTransporter } from '..'
import { Agent, InboundTransporter } from '..'
import { DID_IDENTIFIER_REGEX, VERKEY_REGEX, isFullVerkey, isAbbreviatedVerkey } from '../utils/did'
import { genesisPath, sleep } from './helpers'
import { InitConfig } from '../types'
Expand All @@ -22,7 +22,7 @@ describe('ledger', () => {
let schemaId: SchemaId

beforeAll(async () => {
faberAgent = new Agent(faberConfig, new DummyInboundTransporter(), new DummyOutboundTransporter())
faberAgent = new Agent(faberConfig, new DummyInboundTransporter())
await faberAgent.init()
})

Expand Down Expand Up @@ -140,9 +140,3 @@ class DummyInboundTransporter implements InboundTransporter {
testLogger.test('Starting agent...')
}
}

class DummyOutboundTransporter implements OutboundTransporter {
public async sendMessage() {
testLogger.test('Sending message...')
}
}
12 changes: 6 additions & 6 deletions src/__tests__/proofs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,13 @@ describe('Present Proof', () => {
const faberMessages = new Subject()
const aliceMessages = new Subject()

const faberAgentInbound = new SubjectInboundTransporter(faberMessages)
const faberAgentOutbound = new SubjectOutboundTransporter(aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages)
const aliceAgentOutbound = new SubjectOutboundTransporter(faberMessages)
const faberAgentInbound = new SubjectInboundTransporter(faberMessages, aliceMessages)
const aliceAgentInbound = new SubjectInboundTransporter(aliceMessages, faberMessages)

faberAgent = new Agent(faberConfig, faberAgentInbound, faberAgentOutbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound, aliceAgentOutbound)
faberAgent = new Agent(faberConfig, faberAgentInbound)
aliceAgent = new Agent(aliceConfig, aliceAgentInbound)
faberAgent.setOutboundTransporter(new SubjectOutboundTransporter(aliceMessages))
aliceAgent.setOutboundTransporter(new SubjectOutboundTransporter(faberMessages))
await faberAgent.init()
await aliceAgent.init()

Expand Down
18 changes: 15 additions & 3 deletions src/agent/Agent.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventEmitter } from 'events'
import { Logger } from '../logger'
import { InitConfig } from '../types'
import { IndyWallet } from '../wallet/IndyWallet'
Expand Down Expand Up @@ -33,6 +34,7 @@ import { LedgerModule } from '../modules/ledger/LedgerModule'

export class Agent {
protected logger: Logger
protected eventEmitter: EventEmitter
protected wallet: Wallet
protected agentConfig: AgentConfig
protected messageReceiver: MessageReceiver
Expand Down Expand Up @@ -66,7 +68,6 @@ export class Agent {
public constructor(
initialConfig: InitConfig,
inboundTransporter: InboundTransporter,
outboundTransporter: OutboundTransporter,
messageRepository?: MessageRepository
) {
this.agentConfig = new AgentConfig(initialConfig)
Expand All @@ -79,10 +80,16 @@ export class Agent {
indy: initialConfig.indy != undefined,
logger: initialConfig.logger != undefined,
})

this.eventEmitter = new EventEmitter()
this.eventEmitter.addListener('agentMessage', async (payload) => {
await this.receiveMessage(payload)
})

this.wallet = new IndyWallet(this.agentConfig)
const envelopeService = new EnvelopeService(this.wallet, this.agentConfig)

this.messageSender = new MessageSender(envelopeService, outboundTransporter)
this.messageSender = new MessageSender(envelopeService)
this.dispatcher = new Dispatcher(this.messageSender)
this.inboundTransporter = inboundTransporter

Expand Down Expand Up @@ -119,6 +126,10 @@ export class Agent {
this.registerModules()
}

public setOutboundTransporter(outboundTransporter: OutboundTransporter) {
this.messageSender.setOutboundTransporter(outboundTransporter)
}

public async init() {
await this.wallet.init()

Expand Down Expand Up @@ -182,7 +193,8 @@ export class Agent {
this.provisioningService,
this.messagePickupService,
this.connectionService,
this.messageSender
this.messageSender,
this.eventEmitter
)

this.basicMessages = new BasicMessagesModule(this.dispatcher, this.basicMessageService, this.messageSender)
Expand Down
7 changes: 5 additions & 2 deletions src/agent/Dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Handler } from './Handler'
import { MessageSender } from './MessageSender'
import { AgentMessage } from './AgentMessage'
import { InboundMessageContext } from './models/InboundMessageContext'
import { ReturnRouteTypes } from '../decorators/transport/TransportDecorator'

class Dispatcher {
private handlers: Handler[] = []
Expand All @@ -29,15 +30,17 @@ class Dispatcher {
if (outboundMessage) {
const threadId = outboundMessage.payload.threadId

if (!outboundMessage.connection.hasInboundEndpoint()) {
outboundMessage.payload.setReturnRouting(ReturnRouteTypes.all)
}

// check for return routing, with thread id
if (message.hasReturnRouting(threadId)) {
return await this.messageSender.packMessage(outboundMessage)
}

await this.messageSender.sendMessage(outboundMessage)
}

return outboundMessage || undefined
}

private getHandlerForType(messageType: string): Handler | undefined {
Expand Down
Loading