Skip to content

Commit

Permalink
test: add message queue and some enhancements
Browse files Browse the repository at this point in the history
  • Loading branch information
amagyar-iohk committed Sep 22, 2023
1 parent 9e8fd71 commit 2122535
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {configDotenv} from "dotenv"
configDotenv()

export class EnvironmentVariables {
public static oobUrl: string = process.env.MEDIATOR_OOB_URL!
public static mediatorOobUrl: string = process.env.MEDIATOR_OOB_URL!
public static agentUrl: string = process.env.PRISM_AGENT_URL!
public static publishedDid: string = process.env.PUBLISHED_DID!
public static schemaId: string = process.env.SCHEMA_ID!
Expand Down
100 changes: 72 additions & 28 deletions integration-tests/e2e-tests/tests/sdk/WalletSdk.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import {PlutoInMemory} from "./src/PlutoInMemory"

export class WalletSdk extends Ability implements Initialisable, Discardable {
sdk!: Agent
credentialOfferStack: Message[] = []
proofRequestStack: Message[] = []
issuedCredentialStack: Message[] = []
receivedMessages: string[] = []
messages: MessageQueue = new MessageQueue()

static async withANewInstance(): Promise<Ability> {
return new WalletSdk(await WalletSdkBuilder.createInstance())
Expand All @@ -34,19 +31,19 @@ export class WalletSdk extends Ability implements Initialisable, Discardable {

static credentialOfferStackSize(): QuestionAdapter<number> {
return Question.about("credential offer stack", actor => {
return WalletSdk.as(actor).credentialOfferStack.length
return WalletSdk.as(actor).messages.credentialOfferStack.length
})
}

static issuedCredentialStackSize(): QuestionAdapter<number> {
return Question.about("issued credential stack", actor => {
return WalletSdk.as(actor).issuedCredentialStack.length
return WalletSdk.as(actor).messages.issuedCredentialStack.length
})
}

static proofOfRequestStackSize(): QuestionAdapter<number> {
return Question.about("proof of request stack", actor => {
return WalletSdk.as(actor).proofRequestStack.length
return WalletSdk.as(actor).messages.proofRequestStack.length
})
}

Expand All @@ -57,9 +54,9 @@ export class WalletSdk extends Ability implements Initialisable, Discardable {
}) => Promise<void>): Interaction {
return Interaction.where("#actor uses wallet sdk", async actor => {
await callback(WalletSdk.as(actor).sdk, {
credentialOfferStack: WalletSdk.as(actor).credentialOfferStack,
issuedCredentialStack: WalletSdk.as(actor).issuedCredentialStack,
proofRequestStack: WalletSdk.as(actor).proofRequestStack
credentialOfferStack: WalletSdk.as(actor).messages.credentialOfferStack,
issuedCredentialStack: WalletSdk.as(actor).messages.issuedCredentialStack,
proofRequestStack: WalletSdk.as(actor).messages.proofRequestStack
})
})
}
Expand All @@ -69,28 +66,15 @@ export class WalletSdk extends Ability implements Initialisable, Discardable {
}

async initialise(): Promise<void> {
await this.sdk.start()

this.sdk.addListener(
ListenerKey.MESSAGE,
async (messages: Domain.Message[]) => {
ListenerKey.MESSAGE, (messages: Domain.Message[]) => {
for (const message of messages) {
// checks if sdk already received message
if (this.receivedMessages.includes(message.id)) {
return
}

this.receivedMessages.push(message.id)
if (message.piuri.includes("/offer-credential")) {
this.credentialOfferStack.push(message)
} else if (message.piuri.includes("/present-proof")) {
this.proofRequestStack.push(message)
} else if (message.piuri.includes("/issue-credential")) {
this.issuedCredentialStack.push(message)
}
this.messages.enqueue(message)
}
}
)

await this.sdk.start()
}

isInitialised(): boolean {
Expand All @@ -100,7 +84,7 @@ export class WalletSdk extends Ability implements Initialisable, Discardable {

class WalletSdkBuilder {
private static async getMediatorDidThroughOob(): Promise<string> {
const response = await axios.get(EnvironmentVariables.oobUrl)
const response = await axios.get(EnvironmentVariables.mediatorOobUrl)
const encodedData = response.data.split("?_oob=")[1]
const oobData = JSON.parse(Buffer.from(encodedData, "base64").toString())
return oobData.from
Expand Down Expand Up @@ -143,3 +127,63 @@ class WalletSdkBuilder {
)
}
}

/**
* Helper class for message queueing processor
*/
class MessageQueue {
private processingId: NodeJS.Timeout | null = null
private queue: Message[] = []

credentialOfferStack: Message[] = []
proofRequestStack: Message[] = []
issuedCredentialStack: Message[] = []
receivedMessages: string[] = []

enqueue(message: Message) {
this.queue.push(message)

// auto start processing messages
if (!this.processingId) {
this.processMessages()
}
}

dequeue(): Message {
return this.queue.shift()!
}

// Check if the queue is empty
isEmpty(): boolean {
return this.queue.length === 0
}

// Get the number of messages in the queue
size(): number {
return this.queue.length
}

processMessages() {
this.processingId = setInterval(() => {
if (!this.isEmpty()) {
const message: Message = this.dequeue()
// checks if sdk already received message
if (this.receivedMessages.includes(message.id)) {
return
}

this.receivedMessages.push(message.id)
if (message.piuri.includes("/offer-credential")) {
this.credentialOfferStack.push(message)
} else if (message.piuri.includes("/present-proof")) {
this.proofRequestStack.push(message)
} else if (message.piuri.includes("/issue-credential")) {
this.issuedCredentialStack.push(message)
}
} else {
clearInterval(this.processingId!)
this.processingId = null
}
}, 50)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
import {Utils} from "../../Utils"
import {randomUUID} from "crypto"
import {axiosInstance} from "../steps/LifecycleSteps"
import * as fs from "fs"

export class CloudAgentConfiguration {
static publishedDid: string
Expand All @@ -24,6 +25,18 @@ export class CloudAgentConfiguration {
await this.prepareSchema()

this.isInitialized = true

Utils.appendToNotes(`Mediator: ${EnvironmentVariables.mediatorOobUrl}`)
Utils.appendToNotes(`Agent: ${EnvironmentVariables.agentUrl}`)
Utils.appendToNotes(`DID: ${this.publishedDid}`)
Utils.appendToNotes(`Schema: ${this.schemaId}`)
Utils.appendToNotes(`SDK Version: ${this.getSdkVersion()}`)
}

private static getSdkVersion(): string {
const file = "node_modules/@input-output-hk/atala-prism-wallet-sdk/package.json"
const json = JSON.parse(fs.readFileSync(file).toString())
return json.version
}

/**
Expand All @@ -37,7 +50,7 @@ export class CloudAgentConfiguration {
this.publishedDid = EnvironmentVariables.publishedDid
return
} catch (err) {
console.warn("DID not found. Creating a new one and publishing it.")
Utils.appendToNotes("DID not found. Creating a new one and publishing it.")
}

const creationData = new CreateManagedDidRequest()
Expand Down Expand Up @@ -83,8 +96,6 @@ export class CloudAgentConfiguration {
}
}, 1000)
})

Utils.appendToNotes(`Created new DID: ${this.publishedDid}`)
}

/**
Expand All @@ -98,7 +109,7 @@ export class CloudAgentConfiguration {
this.schemaId = EnvironmentVariables.schemaId
return
} catch (err) {
console.warn("Schema not found. Creating a new one.")
Utils.appendToNotes("Schema not found. Creating a new one.")
}

const credentialSchemaInput = new CredentialSchemaInput()
Expand Down Expand Up @@ -133,7 +144,6 @@ export class CloudAgentConfiguration {
)

this.schemaId = schemaResponse.data.guid

Utils.appendToNotes(`Created new schema: ${this.schemaId}`)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export class CloudAgentWorkflow {
Notepad.notes().get("connectionId")
)
await cloudAgent.attemptsTo(
Wait.upTo(Duration.ofSeconds(60)).until(
Wait.upTo(Duration.ofMinutes(2)).until(
Questions.httpGet(`/connections/${connectionId}`),
Expectations.propertyValueToBe("state", state)
)
Expand All @@ -51,7 +51,7 @@ export class CloudAgentWorkflow {

static async verifyCredentialState(cloudAgent: Actor, recordId: string, state: string) {
await cloudAgent.attemptsTo(
Wait.upTo(Duration.ofSeconds(60)).until(
Wait.upTo(Duration.ofMinutes(2)).until(
Questions.httpGet(`/issue-credentials/records/${recordId}`),
Expectations.propertyValueToBe("protocolState", state)
)
Expand All @@ -63,7 +63,7 @@ export class CloudAgentWorkflow {
Notepad.notes().get("presentationId")
)
await cloudAgent.attemptsTo(
Wait.upTo(Duration.ofSeconds(60)).until(
Wait.upTo(Duration.ofMinutes(2)).until(
Questions.httpGet(`/present-proof/presentations/${presentationId}`),
Expectations.propertyValueToBe("status", state)
)
Expand Down
28 changes: 11 additions & 17 deletions integration-tests/e2e-tests/tests/sdk/src/EdgeAgentWorkflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ export class EdgeAgentWorkflow {

static async waitForCredentialOffer(edgeAgent: Actor, numberOfCredentialOffer: number = 1) {
await edgeAgent.attemptsTo(
Wait.upTo(Duration.ofSeconds(60)).until(
Wait.upTo(Duration.ofMinutes(2)).until(
WalletSdk.credentialOfferStackSize(),
equals(numberOfCredentialOffer)
)
Expand All @@ -26,7 +26,7 @@ export class EdgeAgentWorkflow {

static async waitToReceiveCredentialIssuance(edgeAgent: Actor, expectedNumberOfCredentials: number) {
await edgeAgent.attemptsTo(
Wait.upTo(Duration.ofSeconds(60)).until(
Wait.upTo(Duration.ofMinutes(2)).until(
WalletSdk.issuedCredentialStackSize(),
equals(expectedNumberOfCredentials)
)
Expand All @@ -37,7 +37,7 @@ export class EdgeAgentWorkflow {
await edgeAgent.attemptsTo(
WalletSdk.execute(async (sdk, messages) => {
await Utils.repeat(numberOfCredentials, async () => {
const issuedCredential = messages.issuedCredentialStack.pop()!
const issuedCredential = messages.issuedCredentialStack.shift()!
const issueCredential = IssueCredential.fromMessage(issuedCredential)
await sdk.processIssuedCredentialMessage(issueCredential)
})
Expand All @@ -48,24 +48,21 @@ export class EdgeAgentWorkflow {
static async acceptCredential(edgeAgent: Actor) {
await edgeAgent.attemptsTo(
WalletSdk.execute(async (sdk, messages) => {
const message = OfferCredential.fromMessage(messages.credentialOfferStack.pop()!)
const requestCredential =
await sdk.prepareRequestCredentialWithIssuer(message)
const message = OfferCredential.fromMessage(messages.credentialOfferStack.shift()!)
const requestCredential = await sdk.prepareRequestCredentialWithIssuer(message)
const requestCredentialMessage = requestCredential.makeMessage()
try {
await sdk.sendMessage(requestCredential.makeMessage())
await sdk.sendMessage(requestCredentialMessage)
} catch (e) {
console.error(
"Accepting credential shouldn't throw exception",
new Error().stack?.split("\n")[1].trim(),
)
//
}
})
)
}

static async waitForProofRequest(edgeAgent: Actor) {
await edgeAgent.attemptsTo(
Wait.upTo(Duration.ofSeconds(60)).until(
Wait.upTo(Duration.ofMinutes(2)).until(
WalletSdk.proofOfRequestStackSize(),
equals(1),
),
Expand All @@ -78,7 +75,7 @@ export class EdgeAgentWorkflow {
const credentials = await sdk.verifiableCredentials()
const credential = credentials[0]
const requestPresentationMessage = RequestPresentation.fromMessage(
messages.proofRequestStack.pop()!,
messages.proofRequestStack.shift()!,
)
const presentation = await sdk.createPresentationForRequestProof(
requestPresentationMessage,
Expand All @@ -87,10 +84,7 @@ export class EdgeAgentWorkflow {
try {
await sdk.sendMessage(presentation.makeMessage())
} catch (e) {
console.error(
"Send present-proof shouldn't throw exception",
new Error().stack?.split("\n")[1].trim(),
)
//
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ When("{actor} accepts {int} credentials offer at once from {actor}",
})
await cloudAgent.attemptsTo(Notepad.notes().set("recordIdList", recordIdList))

await EdgeAgentWorkflow.waitForCredentialOffer(edgeAgent, 3)
await EdgeAgentWorkflow.waitForCredentialOffer(edgeAgent, numberOfCredentials)

await Utils.repeat(numberOfCredentials, async () => {
await EdgeAgentWorkflow.acceptCredential(edgeAgent)
Expand Down

0 comments on commit 2122535

Please sign in to comment.