From 30ef8d2ccb8725b0165752e3728d8681abfa78c2 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Wed, 22 May 2024 11:53:26 -0400 Subject: [PATCH 1/7] client lib --- js/flow/package.json | 6 ++ js/flow/src/client/client.ts | 146 +++++++++++++++++++++++++++++++++++ js/flow/src/client/index.ts | 1 + 3 files changed, 153 insertions(+) create mode 100644 js/flow/src/client/client.ts create mode 100644 js/flow/src/client/index.ts diff --git a/js/flow/package.json b/js/flow/package.json index eb0dd8134..accf98302 100644 --- a/js/flow/package.json +++ b/js/flow/package.json @@ -60,6 +60,12 @@ "import": "./lib/index.mjs", "default": "./lib/index.js" }, + "./client": { + "types": "./lib/client/index.d.ts", + "require": "./lib/client/index.js", + "import": "./lib/client/index.mjs", + "default": "./lib/client/index.js" + }, "./experimental": { "types": "./lib/experimental.d.ts", "require": "./lib/experimental.js", diff --git a/js/flow/src/client/client.ts b/js/flow/src/client/client.ts new file mode 100644 index 000000000..3a97083aa --- /dev/null +++ b/js/flow/src/client/client.ts @@ -0,0 +1,146 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +const __flowStreamDelimiter = '\n'; + +type FlowUrl = { + name: string; + projectId: string; + region?: string; + isFirebaseEmulator?: boolean; +}; + +export function streamFlow({ + url, + payload, + headers, +}: { + url: string | FlowUrl; + payload: any; + headers?: Record; +}) { + let chunkStreamController: ReadableStreamDefaultController | undefined = + undefined; + const chunkStream = new ReadableStream({ + start(controller) { + chunkStreamController = controller; + }, + pull() {}, + cancel() {}, + }); + + let flowUrl: string; + if (typeof url === 'string') { + flowUrl = url; + } else { + const region = url.region ?? 'us-central1'; + flowUrl = url.isFirebaseEmulator + ? `http://127.0.0.1:5001/${url.projectId}/${region}/${url.name}` + : `https://${region}-${url.projectId}.cloudfunctions.net/${url.name}`; + } + + const operationPromise = __flowRunEnvelope({ + url: flowUrl, + payload, + streamingCallback: (c) => { + chunkStreamController?.enqueue(c); + }, + headers, + }); + operationPromise.then((o) => { + chunkStreamController?.close(); + return o; + }); + + return { + output() { + return operationPromise.then((op) => { + if (!op.done) { + throw new Error(`flow ${op.name} did not finish execution`); + } + if (op.result?.error) { + throw new Error( + `${op.name}: ${op.result?.error}\n${op.result?.stacktrace}` + ); + } + return op.result?.response; + }); + }, + async *stream() { + const reader = chunkStream.getReader(); + while (true) { + const chunk = await reader.read(); + if (chunk.value) { + yield chunk.value; + } + if (chunk.done) { + break; + } + } + return await operationPromise; + }, + }; +} + +async function __flowRunEnvelope({ + url, + payload, + streamingCallback, + headers, +}: { + url: string; + payload: any; + streamingCallback: (chunk: any) => void; + headers?: Record; +}) { + let response; + response = await fetch(url + '?stream=true', { + method: 'POST', + body: JSON.stringify({ + data: payload, + }), + headers: { + 'Content-Type': 'application/json', + ...headers, + }, + }); + if (!response.body) { + throw new Error('Response body is empty'); + } + var reader = response.body.getReader(); + var decoder = new TextDecoder(); + + let buffer = ''; + while (true) { + const result = await reader.read(); + const decodedValue = decoder.decode(result.value); + if (decodedValue) { + buffer += decodedValue; + } + // If buffer includes the delimiter that means we are still recieving chunks. + while (buffer.includes(__flowStreamDelimiter)) { + streamingCallback( + JSON.parse(buffer.substring(0, buffer.indexOf(__flowStreamDelimiter))) + ); + buffer = buffer.substring( + buffer.indexOf(__flowStreamDelimiter) + __flowStreamDelimiter.length + ); + } + if (result.done) { + return JSON.parse(buffer); + } + } +} diff --git a/js/flow/src/client/index.ts b/js/flow/src/client/index.ts new file mode 100644 index 000000000..d13e8ab44 --- /dev/null +++ b/js/flow/src/client/index.ts @@ -0,0 +1 @@ +export { streamFlow } from './client.js'; From d9a5bbd080892324fda3195032db785afba82df8 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Thu, 23 May 2024 09:48:09 -0400 Subject: [PATCH 2/7] export types, keep it simple --- js/flow/package.json | 3 +++ js/flow/src/client/client.ts | 21 ++------------------- 2 files changed, 5 insertions(+), 19 deletions(-) diff --git a/js/flow/package.json b/js/flow/package.json index 59749ba8c..16123a305 100644 --- a/js/flow/package.json +++ b/js/flow/package.json @@ -77,6 +77,9 @@ "*": { "experimental": [ "lib/experimental" + ], + "client": [ + "lib/client" ] } } diff --git a/js/flow/src/client/client.ts b/js/flow/src/client/client.ts index 3a97083aa..f29938b7e 100644 --- a/js/flow/src/client/client.ts +++ b/js/flow/src/client/client.ts @@ -16,19 +16,12 @@ const __flowStreamDelimiter = '\n'; -type FlowUrl = { - name: string; - projectId: string; - region?: string; - isFirebaseEmulator?: boolean; -}; - export function streamFlow({ url, payload, headers, }: { - url: string | FlowUrl; + url: string; payload: any; headers?: Record; }) { @@ -42,18 +35,8 @@ export function streamFlow({ cancel() {}, }); - let flowUrl: string; - if (typeof url === 'string') { - flowUrl = url; - } else { - const region = url.region ?? 'us-central1'; - flowUrl = url.isFirebaseEmulator - ? `http://127.0.0.1:5001/${url.projectId}/${region}/${url.name}` - : `https://${region}-${url.projectId}.cloudfunctions.net/${url.name}`; - } - const operationPromise = __flowRunEnvelope({ - url: flowUrl, + url, payload, streamingCallback: (c) => { chunkStreamController?.enqueue(c); From 4cc41e852572b95482a37dfec9e79287dfe492c4 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Thu, 23 May 2024 10:30:13 -0400 Subject: [PATCH 3/7] format --- js/flow/src/client/client.ts | 12 ++++++------ js/flow/src/client/index.ts | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 6 deletions(-) diff --git a/js/flow/src/client/client.ts b/js/flow/src/client/client.ts index f29938b7e..f25e5bfcb 100644 --- a/js/flow/src/client/client.ts +++ b/js/flow/src/client/client.ts @@ -18,11 +18,11 @@ const __flowStreamDelimiter = '\n'; export function streamFlow({ url, - payload, + input, headers, }: { url: string; - payload: any; + input: any; headers?: Record; }) { let chunkStreamController: ReadableStreamDefaultController | undefined = @@ -37,7 +37,7 @@ export function streamFlow({ const operationPromise = __flowRunEnvelope({ url, - payload, + input, streamingCallback: (c) => { chunkStreamController?.enqueue(c); }, @@ -80,12 +80,12 @@ export function streamFlow({ async function __flowRunEnvelope({ url, - payload, + input, streamingCallback, headers, }: { url: string; - payload: any; + input: any; streamingCallback: (chunk: any) => void; headers?: Record; }) { @@ -93,7 +93,7 @@ async function __flowRunEnvelope({ response = await fetch(url + '?stream=true', { method: 'POST', body: JSON.stringify({ - data: payload, + data: input, }), headers: { 'Content-Type': 'application/json', diff --git a/js/flow/src/client/index.ts b/js/flow/src/client/index.ts index d13e8ab44..ea08a4bfd 100644 --- a/js/flow/src/client/index.ts +++ b/js/flow/src/client/index.ts @@ -1 +1,17 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + export { streamFlow } from './client.js'; From 48ce3ce0ccee98bd693baba8148f5435bfe4cfc7 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Thu, 23 May 2024 13:39:19 -0400 Subject: [PATCH 4/7] added jsdoc --- js/flow/src/client/client.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/js/flow/src/client/client.ts b/js/flow/src/client/client.ts index f25e5bfcb..7902dcdd2 100644 --- a/js/flow/src/client/client.ts +++ b/js/flow/src/client/client.ts @@ -16,6 +16,24 @@ const __flowStreamDelimiter = '\n'; +/** + * Invoke and stream response from a deployed flow. + * + * For example: + * + * ```js + * import { streamFlow } from '@genkit-ai/flow/client'; + * + * const response = streamFlow({ + * url: 'https://my-flow-deployed-url', + * input: 'foo', + * }); + * for await (const chunk of response.stream()) { + * console.log(chunk); + * } + * console.log(await response.output()); + * ``` + */ export function streamFlow({ url, input, From e05793cebc44183bf94914e5058612e543eb963e Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Fri, 28 Jun 2024 15:37:36 -0400 Subject: [PATCH 5/7] cleanup --- js/flow/src/client/client.ts | 38 ++++++++++++++++++++++++++++++++++++ js/flow/src/client/index.ts | 2 +- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/js/flow/src/client/client.ts b/js/flow/src/client/client.ts index 7902dcdd2..036d342bb 100644 --- a/js/flow/src/client/client.ts +++ b/js/flow/src/client/client.ts @@ -145,3 +145,41 @@ async function __flowRunEnvelope({ } } } + +/** + * Invoke and stream response from a deployed flow. + * + * For example: + * + * ```js + * import { runFlow } from '@genkit-ai/flow/client'; + * + * const response = await runFlow({ + * url: 'https://my-flow-deployed-url', + * input: 'foo', + * }); + * console.log(await response); + * ``` + */ +export async function runFlow({ + url, + payload, + headers, +}: { + url: string; + payload?: any; + headers?: Record; +}) { + const response = await fetch(url, { + method: 'POST', + body: JSON.stringify({ + data: payload, + }), + headers: { + 'Content-Type': 'application/json', + ...headers, + }, + }); + const wrappedDesult = await response.json(); + return wrappedDesult.result; +} diff --git a/js/flow/src/client/index.ts b/js/flow/src/client/index.ts index ea08a4bfd..a7064ec87 100644 --- a/js/flow/src/client/index.ts +++ b/js/flow/src/client/index.ts @@ -14,4 +14,4 @@ * limitations under the License. */ -export { streamFlow } from './client.js'; +export { streamFlow, runFlow } from './client.js'; From 80a541c83b696f7e2be3180eb0cea48f45701067 Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Fri, 28 Jun 2024 15:40:12 -0400 Subject: [PATCH 6/7] format --- js/flow/src/client/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/flow/src/client/index.ts b/js/flow/src/client/index.ts index a7064ec87..154de1604 100644 --- a/js/flow/src/client/index.ts +++ b/js/flow/src/client/index.ts @@ -14,4 +14,4 @@ * limitations under the License. */ -export { streamFlow, runFlow } from './client.js'; +export { runFlow, streamFlow } from './client.js'; From 9e565a62562a2bbb3a25d49f11e564d6e4393b8e Mon Sep 17 00:00:00 2001 From: Pavel Jbanov Date: Fri, 28 Jun 2024 15:43:51 -0400 Subject: [PATCH 7/7] doc fix --- js/flow/src/client/client.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js/flow/src/client/client.ts b/js/flow/src/client/client.ts index 036d342bb..66fc44e33 100644 --- a/js/flow/src/client/client.ts +++ b/js/flow/src/client/client.ts @@ -147,7 +147,7 @@ async function __flowRunEnvelope({ } /** - * Invoke and stream response from a deployed flow. + * Invoke a deployed flow over HTTP(s). * * For example: *