Skip to content

Commit

Permalink
chore: revert onMessage to iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
Juan Scolari committed Jun 17, 2022
1 parent 52ebf27 commit 85af9c8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 17 deletions.
11 changes: 3 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { IBaseComponent } from "@well-known-components/interfaces"
import { connect, NatsConnection } from "nats"
import mitt from "mitt"
import { natsComponent, INatsComponent, NatsEvents, Subscription, NatsMsg } from "./types"
import { natsComponent, INatsComponent, NatsEvents, Subscription } from "./types"

/**
* Create a NATS component (https://nats.io/)
Expand All @@ -25,14 +25,8 @@ export async function createNatsComponent(
natsConnection.publish(topic, message)
}

function subscribe(topic: string, onMessage: (_: NatsMsg) => Promise<void>): Subscription {
function subscribe(topic: string): Subscription {
const sub = natsConnection.subscribe(topic)
;(async () => {
for await (const message of sub) {
await onMessage({ data: message.data, subject: message.subject })
}
})().catch((err: any) => logger.error(`error processing subscription message; ${err.message}`))

sub.closed
.then(() => {
logger.info(`subscription closed for ${topic}`)
Expand All @@ -42,6 +36,7 @@ export async function createNatsComponent(
})
return {
unsubscribe: () => sub.unsubscribe(),
generator: sub,
}
}

Expand Down
14 changes: 6 additions & 8 deletions src/test-component.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { IBaseComponent, IConfigComponent } from "@well-known-components/interfaces"
const { connect } = require("mock-nats-client")
import { natsComponent, INatsComponent, Subscription, NatsEvents, NatsMsg } from "./types"
import { natsComponent, INatsComponent, Subscription, NatsEvents } from "./types"
import mitt from "mitt"

export async function createLocalNatsComponent(
Expand All @@ -13,14 +13,12 @@ export async function createLocalNatsComponent(
message ? client.publish(topic, message) : client.publish(topic, [])
}

function subscribe(topic: string, onMessage: (_: NatsMsg) => Promise<void>): Subscription {
const sid = client.subscribe(topic, (delivery: any, _replyTo: any, subject: string) => {
onMessage({ data: delivery, subject })
})
const unsubscribe = () => {
client.unsubscribe(sid)
function subscribe(topic: string): Subscription {
const sub = client.subscribe(topic)
return {
unsubscribe: () => client.unsubscribe(sub),
generator: sub,
}
return { unsubscribe }
}

async function start() {
Expand Down
3 changes: 2 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export type NatsMsg = {
}

export type Subscription = {
generator: AsyncIterable<NatsMsg>
unsubscribe: () => void
}

Expand All @@ -23,7 +24,7 @@ export type NatsEvents = {

export type INatsComponent = {
publish(topic: string, message?: Uint8Array): void
subscribe(topic: string, onMessage: (_: NatsMsg) => Promise<void>): Subscription
subscribe(topic: string): Subscription

start(): Promise<void>
stop(): Promise<void>
Expand Down

0 comments on commit 85af9c8

Please sign in to comment.