Skip to content

Commit

Permalink
pending: converting rust client to existing resolved event.
Browse files Browse the repository at this point in the history
  • Loading branch information
YoEight committed Jan 31, 2025
1 parent bd1f722 commit f2f094a
Show file tree
Hide file tree
Showing 15 changed files with 1,209 additions and 1,327 deletions.
56 changes: 0 additions & 56 deletions packages/db-client-bridge/lib/index.cjs

This file was deleted.

45 changes: 0 additions & 45 deletions packages/db-client-bridge/lib/index.d.cts

This file was deleted.

1 change: 0 additions & 1 deletion packages/db-client-bridge/lib/index.d.mts

This file was deleted.

2 changes: 0 additions & 2 deletions packages/db-client-bridge/lib/index.mjs

This file was deleted.

18 changes: 0 additions & 18 deletions packages/db-client-bridge/lib/load.cjs

This file was deleted.

1 change: 0 additions & 1 deletion packages/db-client-bridge/lib/load.d.cts

This file was deleted.

2 changes: 1 addition & 1 deletion packages/db-client-bridge/lib/src/index.cjs.map

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions packages/db-client-bridge/lib/src/index.d.cts
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ export type Iterable = {
}>;
};
export type RustClient = {
readStream(stream: string, options: ReadStreamOptions): Promise<AsyncIterable<ResolvedEvent>>;
readStream(stream: string, options: RustReadStreamOptions): Promise<AsyncIterable<ResolvedEvent>>;
};
export type RawClient = {
readStream(stream: string, options: ReadStreamOptions): Promise<Iterable>;
readStream(stream: string, options: RustReadStreamOptions): Promise<Iterable>;
};
export type ReadStreamOptions = {
fromRevision: bigint;
export type RustReadStreamOptions = {
fromRevision: bigint | string;
direction: string;
maxCount: bigint;
requiresLeader: boolean;
resolveLinks: boolean;
};
export type ResolvedEvent = {
event?: RecordedEvent;
Expand All @@ -39,7 +40,4 @@ export type Position = {
commit: bigint;
prepare: bigint;
};
export type Greeting = {
message: string;
};
export declare function createClient(connStr: string): RustClient;
12 changes: 6 additions & 6 deletions packages/db-client-bridge/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@eventstore/db-client-bridge",
"version": "0.1.0",
"description": "A NodeJS binding for the EventStore Rust client",
"main": "./lib/index.cjs",
"main": "./lib/src/index.cjs",
"scripts": {
"test": "tsc &&cargo test",
"cargo-build": "tsc && cargo build --message-format=json-render-diagnostics > cargo.log",
Expand All @@ -22,16 +22,16 @@
"exports": {
".": {
"import": {
"types": "./lib/index.d.mts",
"default": "./lib/index.mjs"
"types": "./lib/src/index.d.mts",
"default": "./lib/src/index.mjs"
},
"require": {
"types": "./lib/index.d.cts",
"default": "./lib/index.cjs"
"types": "./lib/src/index.d.cts",
"default": "./lib/src/index.cjs"
}
}
},
"types": "./lib/index.d.cts",
"types": "./lib/src/index.d.cts",
"files": [
"lib/**/*.?({c,m}){t,j}s"
],
Expand Down
34 changes: 24 additions & 10 deletions packages/db-client-bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,22 @@ pub fn read_stream(client: Client, mut cx: FunctionContext) -> JsResult<JsPromis
x => cx.throw_error(format!("invalid direction value: '{}'", x))?,
};

let options = match params
.get::<JsBigInt, _, _>(&mut cx, "fromRevision")?
.to_u64(&mut cx)
let options = if let Ok(value) = params
.get_value(&mut cx, "fromRevision")?
.downcast::<JsString, _>(&mut cx)
{
Ok(r) => {
if r == 0 {
options.position(StreamPosition::Start)
} else {
options.position(StreamPosition::Position(r))
}
match value.value(&mut cx).as_str() {
"start" => options.position(StreamPosition::Start),
"end" => options.position(StreamPosition::End),
x => cx.throw_error(format!("invalid fromRevision value: '{}'", x))?,
}
Err(e) => cx.throw_error(e.to_string())?,
} else if let Ok(value) = params.get::<JsBigInt, _, _>(&mut cx, "fromRevision") {
match value.to_u64(&mut cx) {
Ok(r) => options.position(StreamPosition::Position(r)),
Err(e) => cx.throw_error(e.to_string())?,
}
} else {
cx.throw_error("fromRevision can only be 'start', 'end' or a bigint")?
};

let options = match params
Expand All @@ -78,6 +82,16 @@ pub fn read_stream(client: Client, mut cx: FunctionContext) -> JsResult<JsPromis
.value(&mut cx);
let options = options.requires_leader(require_leader);

let resolve_links = params
.get::<JsBoolean, _, _>(&mut cx, "resolvesLink")?
.value(&mut cx);

let options = if resolve_links {
options.resolve_link_tos()
} else {
options
};

let (deferred, promise) = cx.promise();
let channel = cx.channel();

Expand Down
15 changes: 6 additions & 9 deletions packages/db-client-bridge/src/index.cts
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@ export type Iterable = {
}

export type RustClient = {
readStream(stream: string, options: ReadStreamOptions): Promise<AsyncIterable<ResolvedEvent>>;
readStream(stream: string, options: RustReadStreamOptions): Promise<AsyncIterable<ResolvedEvent>>;
};

export type RawClient = {
readStream(stream: string, options: ReadStreamOptions): Promise<Iterable>;
readStream(stream: string, options: RustReadStreamOptions): Promise<Iterable>;
}

export type ReadStreamOptions = {
fromRevision: bigint;
export type RustReadStreamOptions = {
fromRevision: bigint | string;
direction: string;
maxCount: bigint;
requiresLeader: boolean;
resolveLinks: boolean;
};

export type ResolvedEvent = {
Expand All @@ -51,15 +52,11 @@ export type Position = {
prepare: bigint;
};

export type Greeting = {
message: string
};

export function createClient(connStr: string): RustClient {
const client = addon.createClient(connStr);

return {
async readStream(stream: string, options: ReadStreamOptions): Promise<AsyncIterable<ResolvedEvent>> {
async readStream(stream: string, options: RustReadStreamOptions): Promise<AsyncIterable<ResolvedEvent>> {
const iterable = await client.readStream(stream, options);

return {
Expand Down
17 changes: 17 additions & 0 deletions packages/db-client/src/Client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import {
ClientReadableStreamImpl,
} from "@grpc/grpc-js/build/src/call";

import * as bridge from "@eventstore/db-client-bridge";

import type {
NodePreference,
GRPCClientConstructor,
EndPoint,
Credentials,
BaseOptions,
} from "../types";

import {
CancelledError,
convertToCommandError,
Expand Down Expand Up @@ -145,6 +148,7 @@ interface NextChannelSettings {
}

export class Client {
#rustClient: bridge.RustClient;
#throwOnAppendFailure: boolean;
#connectionSettings: ConnectionSettings;
#channelCredentials: ChannelCredentials;
Expand Down Expand Up @@ -237,6 +241,7 @@ export class Client {
channelCredentials.userCertFile = readFileSync(certPathResolved);
}

const rustClient = bridge.createClient(string);
if (options.dnsDiscover) {
const [discover] = options.hosts;

Expand All @@ -247,6 +252,7 @@ export class Client {
}

return new Client(
rustClient,
{
discover,
nodePreference: options.nodePreference,
Expand All @@ -266,6 +272,7 @@ export class Client {

if (options.hosts.length > 1) {
return new Client(
rustClient,
{
endpoints: options.hosts,
nodePreference: options.nodePreference,
Expand All @@ -284,6 +291,7 @@ export class Client {
}

return new Client(
rustClient,
{
endpoint: options.hosts[0],
throwOnAppendFailure: options.throwOnAppendFailure,
Expand All @@ -298,21 +306,25 @@ export class Client {
}

constructor(
rustClient: bridge.RustClient,
connectionSettings: DNSClusterOptions,
channelCredentials?: ChannelCredentialOptions,
defaultUserCredentials?: Credentials
);
constructor(
rustClient: bridge.RustClient,
connectionSettings: GossipClusterOptions,
channelCredentials?: ChannelCredentialOptions,
defaultUserCredentials?: Credentials
);
constructor(
rustClient: bridge.RustClient,
connectionSettings: SingleNodeOptions,
channelCredentials?: ChannelCredentialOptions,
defaultUserCredentials?: Credentials
);
constructor(
rustClient: bridge.RustClient,
{
throwOnAppendFailure = true,
keepAliveInterval = 10_000,
Expand Down Expand Up @@ -354,6 +366,7 @@ export class Client {
);
}

this.#rustClient = rustClient;
this.#throwOnAppendFailure = throwOnAppendFailure;
this.#keepAliveInterval = keepAliveInterval;
this.#keepAliveTimeout = keepAliveTimeout;
Expand Down Expand Up @@ -692,4 +705,8 @@ export class Client {
protected get throwOnAppendFailure(): boolean {
return this.#throwOnAppendFailure;
}

protected get rustClient(): bridge.RustClient {
return this.#rustClient;
}
}
Loading

0 comments on commit f2f094a

Please sign in to comment.