Skip to content

Commit

Permalink
Continue Water Broker rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
wasdennnoch committed Jul 14, 2024
1 parent 3187900 commit 3cade2a
Show file tree
Hide file tree
Showing 15 changed files with 745 additions and 451 deletions.
2 changes: 1 addition & 1 deletion water/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"dist"
],
"dependencies": {
"kafkajs": "^2.2.4"
"valibot": "^0.36.0"
},
"devDependencies": {
"@types/node": "^20.14.10",
Expand Down
332 changes: 212 additions & 120 deletions water/src/broker/BrokerClient.ts
Original file line number Diff line number Diff line change
@@ -1,124 +1,216 @@
import { Logger } from "../logging/Logger.js";
import { CountDownLatch } from "../util/CountDownLatch.js";
import type { IBrokerConnection } from "./BrokerConnection.js";
import type { IBrokerMessageHeaders } from "./BrokerMessageHeaders.js";
import { BrokerMessage } from "./BrokerMessage.js";

const TAG = "BrokerClient";

export type BrokerEventListener<T> = (msg: BrokerMessage<T>) => void | Promise<void>;
export type BrokerMessageListener<T> = (clusterId: string, msg: BrokerMessage<T>) => void | Promise<void>;
export interface ClusterResult<T> {
responses: Map<string, BrokerMessage<T>>;
timeout: boolean;
import type { BrokerConnection, TopicListener } from "./BrokerConnection.js";
import type { BrokerMessage } from "./BrokerMessage.js";
import type { BrokerMessageHeaders } from "./BrokerMessageHeaders.js";
import { RpcClient } from "./rpc/RpcClient.js";
import type { RpcRequestMessage, RpcResponse } from "./rpc/RpcMessage.js";
import type { BaseSubclient } from "./Subclients.js";
import { BrokerClientOptions, ConsumerSubclient, ProducerSubclient } from "./Subclients.js";

export abstract class BrokerClient {

private static readonly TAG = "BrokerClient";

private readonly topics: Map<string, TopicMetadata> = new Map();

public constructor(public readonly connection: BrokerConnection) { }

public consumer<T>(
topic: string,
key: string,
schema: T, // TODO
options: BrokerClientOptions = new BrokerClientOptions(),
callback: (msg: BrokerMessage<T>) => Promise<void>,
): ConsumerSubclient<T> {
Logger.debug(BrokerClient.TAG, `Creating consumer for key '${key}' in topic '${topic}'`);
const client = new ConsumerSubclient(this.connection, this, topic, key, options, schema, callback);
this.registerSubclient(client);
return client;
}

public producer<T>(
topic: string,
key: string,
schema: T, // TODO
options: BrokerClientOptions = new BrokerClientOptions(),
): ProducerSubclient<T> {
Logger.debug(BrokerClient.TAG, `Creating producer for key '${key}' in topic '${topic}'`);
const client = new ProducerSubclient(this.connection, this, topic, key, options, schema);
this.registerSubclient(client);
return client;
}

public rpc<RequestT, ResponseT>(
topic: string,
key: string,
requestSchema: RequestT, // TODO
responseSchema: ResponseT, // TODO
options: BrokerClientOptions = new BrokerClientOptions(),
callback: (msg: RpcRequestMessage<RequestT, ResponseT>) => Promise<RpcResponse<ResponseT>>,
): RpcClient<RequestT, ResponseT> {
return new RpcClient(
this,
topic,
key,
options,
requestSchema,
responseSchema,
callback,
);
}

private registerSubclient(subclient: BaseSubclient): void {
const topic = subclient.topic;
if (!this.topics.has(topic)) {
this.topics.set(topic, new TopicMetadata(this.connection, topic));
}
const metadata = this.topics.get(topic)!;
metadata.registerSubclient(subclient);
}

// Private API
public deregisterSubclient(subclient: BaseSubclient): void {
const topic = subclient.topic;
if (this.topics.has(topic)) {
const metadata = this.topics.get(topic)!;
metadata.deregisterSubclient(subclient);
if (metadata.isEmpty) {
this.topics.delete(topic);
}
}
}

public destroy(): void {
Logger.debug(BrokerClient.TAG, `Destroying BrokerClient with active topics: ${[...this.topics.keys()]}`);
while (this.topics.size > 0) {
const [topic, metadata] = this.topics.entries().next().value as [string, TopicMetadata];
metadata.destroy();
this.topics.delete(topic);
}
}

}

class TopicMetadata {

private static readonly TAG = "TopicMetadata";

private readonly keys: Map<string, KeyMetadata> = new Map();
private isDestroyed: boolean = false;
private connectionListener: TopicListener | null = null;

public constructor(
private readonly connection: BrokerConnection,
private readonly topic: string,
) { }

public get isEmpty(): boolean {
return this.keys.size === 0;
}

public registerSubclient(subclient: BaseSubclient): void {
if (subclient.topic !== this.topic) {
throw new Error(`Attempting to register subclient with topic '${subclient.topic}' in TopicMetadata of '${this.topic}'`);
}
Logger.debug(TopicMetadata.TAG, `Adding ${this.constructor.name} for key '${subclient.key}' in topic '${this.topic}'`);
const metadata = this.getOrCreateKeyMetadata(subclient.key);
if (subclient instanceof ConsumerSubclient) {
if (metadata.consumers.size === 0 && this.connectionListener === null) {
Logger.debug(TopicMetadata.TAG, `Creating new connection listener for topic '${this.topic}'`);
this.connectionListener = (topic, key, value, headers) => this.onTopicMessage(topic, key, value, headers);
this.connection.on(this.topic, this.connectionListener);
}
metadata.consumers.add(subclient);
} else if (subclient instanceof ProducerSubclient) {
metadata.producers.add(subclient);
}
}

public deregisterSubclient(subclient: BaseSubclient): void {
Logger.debug(TopicMetadata.TAG, `Removing ${this.constructor.name} for key '${subclient.key}' in topic '${this.topic}'`);
const metadata = this.getExistingKeyMetadata(subclient.key);
if (metadata) {
metadata.producers.delete(subclient as ProducerSubclient<any>);
metadata.consumers.delete(subclient as ConsumerSubclient<any>);
this.maybeCleanupKeyMetadata(metadata);
}
}

private maybeCleanupKeyMetadata(metadata: KeyMetadata): void {
if (metadata.isEmpty) {
this.keys.delete(metadata.key);
}
if (this.isEmpty && this.connectionListener !== null) {
Logger.debug(TopicMetadata.TAG, `Removing connection listener for topic '${this.topic}' after key cleanup`);
this.connection.off(this.topic, this.connectionListener);
this.connectionListener = null;
}
}

private getOrCreateKeyMetadata(key: string): KeyMetadata {
if (!this.keys.has(key)) {
this.keys.set(key, new KeyMetadata(key));
}
return this.keys.get(key)!;
}

private getExistingKeyMetadata(key: string): KeyMetadata | null {
return this.keys.get(key) ?? null;
}

private onTopicMessage(
topic: string,
key: string,
value: string,
headers: BrokerMessageHeaders,
): void {
const metadata = this.getExistingKeyMetadata(key);
if (!metadata) {
return;
}
for (const consumer of metadata.consumers) {
consumer
.onIncomingMessage(value, headers)
.catch(e => Logger.error(TopicMetadata.TAG,
`Uncaught error in BrokerClient listener for key '${key}' in topic '${topic}'`, e));
}
}

public destroy(): void {
if (this.isDestroyed) {
return;
}
while (this.keys.size > 0) {
const [key, metadata] = this.keys.entries().next().value as [string, KeyMetadata];
metadata.destroy();
this.keys.delete(key);
}
if (this.connectionListener !== null) {
Logger.debug(TopicMetadata.TAG, `Removing connection listener for topic '${this.topic}' during destroy`);
this.connection.off(this.topic, this.connectionListener);
this.connectionListener = null;
}
}

}

export class BrokerClient<T> {
private keyListeners: Map<string, Set<BrokerEventListener<T>>> = new Map();

public constructor(
private readonly connection: IBrokerConnection,
private readonly topicName: string
) {
Logger.debug(TAG, `Initializing BrokerClient with topic '${topicName}'`);
connection.on(topicName, this.onTopicMessage.bind(this));
}

protected async send(
key: string,
obj: T | null,
headers: IBrokerMessageHeaders = this.connection.createHeaders()
): Promise<string> {
return await this.connection.send(this.topicName, key, this.stringify(obj), headers);
}

protected async sendClusterRequest(
key: string,
obj: T | null,
timeout: number = 0,
targetClusters: Set<string> = new Set(),
expectedResponses: number | null = null,
messageCallback: BrokerMessageListener<T> | null = null
): Promise<ClusterResult<T>> {
const responseKey = this.toResponseKey(key);

const responses: Map<string, BrokerMessage<T>> = new Map();
const latch = new CountDownLatch(expectedResponses ?? targetClusters.size);
let requestId = "";

const cb: BrokerEventListener<T> = (msg: BrokerMessage<T>) => {
if (msg.headers.requestId !== requestId) {
return;
}
messageCallback?.(msg.headers.sourceCluster, msg)?.catch(e =>
Logger.error(TAG, "Uncaught error in sendClusterRequest message callback", e)
);

responses.set(msg.headers.sourceCluster, msg);
};

this.on(responseKey, cb);
let timeoutReached = false;
try {
requestId = await this.send(key, obj, this.connection.createHeaders(targetClusters));

timeoutReached = !(await latch.await(timeout));
} finally {
this.off(responseKey, cb);
}

return {
responses,
timeout: timeoutReached,
};
}

protected on(key: string, cb: BrokerEventListener<T>): void {
let listeners = this.keyListeners.get(key);
if (!listeners) {
listeners = new Set();
this.keyListeners.set(key, listeners);
}
listeners.add(cb);
}

protected off(key: string, cb: BrokerEventListener<T>): void {
const listeners = this.keyListeners.get(key);
if (listeners) {
listeners.delete(cb);
if (!listeners.size) {
this.keyListeners.delete(key);
}
}
}

// For internal use only. Do not call externally!
public async _respond(msg: BrokerMessage<T>, data: T | null): Promise<void> {
const newHeaders = this.connection.createHeaders(new Set([msg.headers.sourceCluster]), msg.headers.requestId);
await this.send(this.toResponseKey(msg.key), data, newHeaders);
}

private parse(json: string): T | null {
return JSON.parse(json) as T | null;
}

private stringify(obj: T | null): string {
return JSON.stringify(obj);
}

private async onTopicMessage(key: string, value: string, headers: IBrokerMessageHeaders): Promise<void> {
const obj = this.parse(value);
const msg = new BrokerMessage(this, key, obj, headers);
const listeners = this.keyListeners.get(key);
if (!listeners || !listeners.size) {
return;
}
for (const listener of listeners) {
listener(msg)?.catch(e => Logger.error(TAG, "Uncaught error in BrokerClient listener", e));
}
}

private toResponseKey(key: string): string {
return `${key}-response`;
}
class KeyMetadata {

public readonly producers: Set<ProducerSubclient<any>> = new Set();
public readonly consumers: Set<ConsumerSubclient<any>> = new Set();

public constructor(
public readonly key: string,
) { }

public get isEmpty(): boolean {
return this.producers.size === 0 && this.consumers.size === 0;
}

public destroy(): void {
this.producers.forEach(producer => producer.destroy());
this.consumers.forEach(consumer => consumer.destroy());
this.producers.clear();
this.consumers.clear();
}
}
Loading

0 comments on commit 3cade2a

Please sign in to comment.