Skip to content

Commit

Permalink
feat: use callbacks instead of async generators (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hugo Arregui authored Aug 29, 2022
1 parent faca1f3 commit 2265470
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 45 deletions.
5 changes: 0 additions & 5 deletions etc/nats-component.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,9 @@ export function encodeJson<T = unknown>(d: T): Uint8Array;

// @public
export type Subscription = {
generator: AsyncIterable<NatsMsg>;
unsubscribe: () => void;
};

// Warnings were encountered during analysis:
//
// src/types.ts:21:3 - (ae-forgotten-export) The symbol "NatsMsg" needs to be exported by the entry point index.d.ts

// (No @packageDocumentation comment for this package)

```
44 changes: 22 additions & 22 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { IBaseComponent } from "@well-known-components/interfaces"
import { connect, NatsConnection, JSONCodec } from "nats"
// import * as nats from "nats"
import { connect, Msg, NatsConnection, NatsError } from "nats"
import mitt from "mitt"
import { natsComponent, INatsComponent, NatsEvents, Subscription } from "./types"
import { natsComponent, INatsComponent, NatsEvents, Subscription, SubscriptionCallback } from "./types"

export { createLocalNatsComponent } from "./test-component"
export { encodeJson, decodeJson } from "./codecs"
Expand Down Expand Up @@ -33,30 +32,33 @@ export async function createNatsComponent(
natsConnection.publish(topic, message)
}

function subscribe(topic: string): Subscription {
function subscribe(topic: string, cb: SubscriptionCallback): Subscription {
if (!natsConnection) {
throw new Error("NATS component was not started yet")
}

const sub = natsConnection.subscribe(topic)
const sub = natsConnection.subscribe(topic, {
callback: (err: NatsError | null, msg: Msg) => {
cb(err, msg)
},
})

sub.closed
.then(() => {
logger.debug(`Subscription closed for topic`, { topic })
})
.catch((err: any) => {
logger.error(`Subscription closed with an error`, err)
})
return {
unsubscribe: () => sub.unsubscribe(),
generator: sub,
}

return sub
}

let didStop = false

async function printStatus(connection: NatsConnection) {
for await (const s of connection.status()) {
logger.info(`Status change`, s as any);
logger.info(`Status change`, s as any)
}
}

Expand All @@ -65,20 +67,18 @@ export async function createNatsComponent(
natsConnection = await connect({ servers: `${natsUrl}` })
printStatus(natsConnection).catch(logger.error)

natsConnection
.closed()
.then((err) => {
if (!didStop) {
logger.error(`NATS connection lost`)
if (err) {
logger.error(err)
}
// TODO: gracefully quit, this is an unrecoverable state
process.exit(1)
natsConnection.closed().then((err) => {
if (!didStop) {
logger.error(`NATS connection lost`)
if (err) {
logger.error(err)
}
})
// TODO: gracefully quit, this is an unrecoverable state
process.exit(1)
}
})
events.emit("connected")
logger.info(`Connected`, { server: natsConnection.getServer() });
logger.info(`Connected`, { server: natsConnection.getServer() })
} catch (error) {
logger.error(`An error occurred trying to connect to the NATS server: ${natsUrl}`)
throw error
Expand Down
29 changes: 13 additions & 16 deletions src/test-component.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
import { IBaseComponent } from "@well-known-components/interfaces"
import { pushableChannel } from "@well-known-components/pushable-channel"
import mitt from "mitt"
import { INatsComponent, Subscription, NatsEvents, NatsMsg } from "./types"

type PushableChannel = {
push: (value: NatsMsg, resolve: (err?: any) => void) => void
}
import { INatsComponent, Subscription, NatsEvents, SubscriptionCallback } from "./types"

/**
* Create a local NATS component, for testing purposes
* @public
*/
export async function createLocalNatsComponent(): Promise<INatsComponent & IBaseComponent> {
const channels = new Map<string, PushableChannel>()
const callbacks = new Map<string, Set<SubscriptionCallback>>()
const events = mitt<NatsEvents>()

function publish(topic: string, data: Uint8Array): void {
channels.forEach((ch, pattern) => {
callbacks.forEach((cbs, pattern) => {
const sPattern = pattern.split(".")
const sTopic = topic.split(".")

Expand All @@ -30,18 +25,20 @@ export async function createLocalNatsComponent(): Promise<INatsComponent & IBase
}
}

ch.push({ subject: topic, data }, console.log)
for (const cb of cbs) {
cb(null, { subject: topic, data })
}
})
}

function subscribe(pattern: string): Subscription {
const channel = pushableChannel<NatsMsg>(function deferCloseChannel() {
channels.delete(pattern)
})
channels.set(pattern, channel)
function subscribe(pattern: string, cb: SubscriptionCallback): Subscription {
const cbs = callbacks.get(pattern) || new Set<SubscriptionCallback>()
cbs.add(cb)
callbacks.set(pattern, cbs)
return {
unsubscribe: () => channel.close(),
generator: channel.iterable,
unsubscribe: () => {
cbs.delete(cb)
},
}
}

Expand Down
9 changes: 7 additions & 2 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@ export type NatsMsg = {
* @public
*/
export type Subscription = {
generator: AsyncIterable<NatsMsg>
unsubscribe: () => void
}

/**
* Subscription Callback
* @public
*/
export type SubscriptionCallback = (error: Error | null, msg: NatsMsg) => void

export type NatsEvents = {
connected: void
}

export type INatsComponent = {
publish(topic: string, message?: Uint8Array): void
subscribe(topic: string): Subscription
subscribe(topic: string, cb: SubscriptionCallback): Subscription

start(): Promise<void>
stop(): Promise<void>
Expand Down

0 comments on commit 2265470

Please sign in to comment.