Skip to content

Commit

Permalink
chore: replace iterators with onMessage
Browse files Browse the repository at this point in the history
  • Loading branch information
Juan Scolari committed Jun 17, 2022
1 parent 0ac8ca9 commit 52ebf27
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 11 deletions.
11 changes: 8 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IBaseComponent } from "@well-known-components/interfaces"
import { connect, NatsConnection } from "nats"
import mitt from "mitt"
import { natsComponent, INatsComponent, NatsEvents, Subscription } from "./types"
import { natsComponent, INatsComponent, NatsEvents, Subscription, NatsMsg } from "./types"

/**
* Create a NATS component (https://nats.io/)
Expand All @@ -25,8 +25,14 @@ export async function createNatsComponent(
natsConnection.publish(topic, message)
}

function subscribe(topic: string): Subscription {
function subscribe(topic: string, onMessage: (_: NatsMsg) => Promise<void>): Subscription {
const sub = natsConnection.subscribe(topic)
;(async () => {
for await (const message of sub) {
await onMessage({ data: message.data, subject: message.subject })
}
})().catch((err: any) => logger.error(`error processing subscription message; ${err.message}`))

sub.closed
.then(() => {
logger.info(`subscription closed for ${topic}`)
Expand All @@ -36,7 +42,6 @@ export async function createNatsComponent(
})
return {
unsubscribe: () => sub.unsubscribe(),
generator: sub,
}
}

Expand Down
14 changes: 8 additions & 6 deletions src/test-component.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IBaseComponent, IConfigComponent } from "@well-known-components/interfaces"
const { connect } = require("mock-nats-client")
import { natsComponent, INatsComponent, Subscription, NatsEvents } from "./types"
import { natsComponent, INatsComponent, Subscription, NatsEvents, NatsMsg } from "./types"
import mitt from "mitt"

export async function createLocalNatsComponent(
Expand All @@ -13,12 +13,14 @@ export async function createLocalNatsComponent(
message ? client.publish(topic, message) : client.publish(topic, [])
}

function subscribe(topic: string): Subscription {
const sub = client.subscribe(topic)
return {
unsubscribe: () => client.unsubscribe(sub),
generator: sub,
function subscribe(topic: string, onMessage: (_: NatsMsg) => Promise<void>): Subscription {
const sid = client.subscribe(topic, (delivery: any, _replyTo: any, subject: string) => {
onMessage({ data: delivery, subject })
})
const unsubscribe = () => {
client.unsubscribe(sid)
}
return { unsubscribe }
}

async function start() {
Expand Down
3 changes: 1 addition & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ export type NatsMsg = {
}

export type Subscription = {
generator: AsyncIterable<NatsMsg>
unsubscribe: () => void
}

Expand All @@ -24,7 +23,7 @@ export type NatsEvents = {

export type INatsComponent = {
publish(topic: string, message?: Uint8Array): void
subscribe(topic: string): Subscription
subscribe(topic: string, onMessage: (_: NatsMsg) => Promise<void>): Subscription

start(): Promise<void>
stop(): Promise<void>
Expand Down

0 comments on commit 52ebf27

Please sign in to comment.